diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 5df5cc8fa1a..e9acfadb004 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -288,6 +288,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume private RPCHook rpcHook = null; + /** + * Interval to clean expired messages + */ + private long cleanExpiredMsgInterval = 15; + /** * Default constructor. */ @@ -1002,4 +1007,12 @@ public MessageQueueListener getMessageQueueListener() { public void setMessageQueueListener(MessageQueueListener messageQueueListener) { this.messageQueueListener = messageQueueListener; } + + public long getCleanExpiredMsgInterval() { + return cleanExpiredMsgInterval; + } + + public void setCleanExpiredMsgInterval(long cleanExpiredMsgInterval) { + this.cleanExpiredMsgInterval = cleanExpiredMsgInterval; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index b151fefbbb3..755a98c554c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -94,7 +94,7 @@ public void run() { } } - }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); + }, this.defaultMQPushConsumer.getCleanExpiredMsgInterval(), this.defaultMQPushConsumer.getCleanExpiredMsgInterval(), TimeUnit.MINUTES); } public void shutdown(long awaitTerminateMillis) {