From ae71ddde619dd9c4011fd130927d7a5f75286d91 Mon Sep 17 00:00:00 2001 From: Hankunming <1109939087@qq.com> Date: Wed, 4 Dec 2024 11:49:56 +0800 Subject: [PATCH] feat: make check expired message interval configurable to reduce the consuming thread block time --- .../client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++ .../consumer/ConsumeMessageConcurrentlyService.java | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 5df5cc8fa1a..e9acfadb004 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -288,6 +288,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume private RPCHook rpcHook = null; + /** + * Interval to clean expired messages + */ + private long cleanExpiredMsgInterval = 15; + /** * Default constructor. */ @@ -1002,4 +1007,12 @@ public MessageQueueListener getMessageQueueListener() { public void setMessageQueueListener(MessageQueueListener messageQueueListener) { this.messageQueueListener = messageQueueListener; } + + public long getCleanExpiredMsgInterval() { + return cleanExpiredMsgInterval; + } + + public void setCleanExpiredMsgInterval(long cleanExpiredMsgInterval) { + this.cleanExpiredMsgInterval = cleanExpiredMsgInterval; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index b151fefbbb3..755a98c554c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -94,7 +94,7 @@ public void run() { } } - }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); + }, this.defaultMQPushConsumer.getCleanExpiredMsgInterval(), this.defaultMQPushConsumer.getCleanExpiredMsgInterval(), TimeUnit.MINUTES); } public void shutdown(long awaitTerminateMillis) {