diff --git a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java index 7d0ff45d70..28580cf8f0 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java @@ -111,10 +111,13 @@ public void init(Properties keyValue) throws Exception { this.connection = getConnection(); this.channel = getChannel(); this.rabbitmqConsumerHandler = new RabbitmqConsumerHandler(channel, configurationHolder, - isBroadcast ? consumerGroup : configurationHolder.getQueueName(),this); + isBroadcast ? consumerGroup : configurationHolder.getQueueName(), this); } public Channel reConnectChannel() throws Exception { + if (isClosed()) { + return null; + } this.connection = getConnection(); this.channel = getChannel(); return this.channel; diff --git a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java index 1e08df6d86..6e5c620519 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java @@ -53,6 +53,9 @@ public RabbitmqConsumerHandler(Channel channel, ConfigurationHolder configuratio public void run() { while (!stop.get()) { try { + if (!stop.get() && consumer.isStarted() && !channel.isOpen()) { + this.channel = consumer.reConnectChannel(); + } GetResponse response = channel.basicGet(queueName, configurationHolder.isAutoAck()); if (response != null) { RabbitmqCloudEvent rabbitmqCloudEvent = RabbitmqCloudEvent.getFromByteArray(response.getBody()); @@ -73,13 +76,6 @@ public void commit(EventMeshAction action) { } } catch (Exception ex) { log.error("[RabbitmqConsumerHandler] thread run happen exception.", ex); - if (!stop.get()) { - try { - this.channel = consumer.reConnectChannel(); - } catch (Exception e) { - log.error("[RabbitmqConsumerHandler] reconnect error.", e); - } - } } } } diff --git a/gradle.properties b/gradle.properties index 371e0abe28..b8094baff7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ jdk=17 snapshot=false group=org.apache.eventmesh -version=1.10.0-jdk17-release +version=1.10.0.1-jdk17-release #last eight bits of public key signing.keyId= #passphrase for key pairs