From 9dc90d2467e7501ea06415e5f3e047e8228c0d08 Mon Sep 17 00:00:00 2001 From: "Horry.Xu" Date: Thu, 4 Jan 2024 11:20:29 +0800 Subject: [PATCH] release 1.10.0.1-jdk17 --- .../storage/rabbitmq/consumer/RabbitmqConsumer.java | 5 ++++- .../rabbitmq/consumer/RabbitmqConsumerHandler.java | 10 +++------- gradle.properties | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java index 7d0ff45d70..28580cf8f0 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java @@ -111,10 +111,13 @@ public void init(Properties keyValue) throws Exception { this.connection = getConnection(); this.channel = getChannel(); this.rabbitmqConsumerHandler = new RabbitmqConsumerHandler(channel, configurationHolder, - isBroadcast ? consumerGroup : configurationHolder.getQueueName(),this); + isBroadcast ? consumerGroup : configurationHolder.getQueueName(), this); } public Channel reConnectChannel() throws Exception { + if (isClosed()) { + return null; + } this.connection = getConnection(); this.channel = getChannel(); return this.channel; diff --git a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java index 1e08df6d86..6e5c620519 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumerHandler.java @@ -53,6 +53,9 @@ public RabbitmqConsumerHandler(Channel channel, ConfigurationHolder configuratio public void run() { while (!stop.get()) { try { + if (!stop.get() && consumer.isStarted() && !channel.isOpen()) { + this.channel = consumer.reConnectChannel(); + } GetResponse response = channel.basicGet(queueName, configurationHolder.isAutoAck()); if (response != null) { RabbitmqCloudEvent rabbitmqCloudEvent = RabbitmqCloudEvent.getFromByteArray(response.getBody()); @@ -73,13 +76,6 @@ public void commit(EventMeshAction action) { } } catch (Exception ex) { log.error("[RabbitmqConsumerHandler] thread run happen exception.", ex); - if (!stop.get()) { - try { - this.channel = consumer.reConnectChannel(); - } catch (Exception e) { - log.error("[RabbitmqConsumerHandler] reconnect error.", e); - } - } } } } diff --git a/gradle.properties b/gradle.properties index 371e0abe28..b8094baff7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ jdk=17 snapshot=false group=org.apache.eventmesh -version=1.10.0-jdk17-release +version=1.10.0.1-jdk17-release #last eight bits of public key signing.keyId= #passphrase for key pairs