Skip to content

Commit

Permalink
Fix the issue of duplicate consumption in LMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin committed Jan 4, 2025
1 parent f32fe78 commit 03cdcaa
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;

Expand All @@ -63,7 +63,8 @@ public PopLongPollingService(BrokerController brokerController, NettyRequestProc
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
Expand Down Expand Up @@ -350,16 +351,16 @@ private void cleanUnusedResource() {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
Iterator<Map.Entry<String, Byte>> cidMapIter = entry.getValue().entrySet().iterator();
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
Expand All @@ -380,12 +381,12 @@ private void cleanUnusedResource() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic);
pollingMapIter.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void autoClean() {
continue;
}

if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) {
if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) {
iterator.remove();
log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private RemotingCommand getSubscriptionGroup(ChannelHandlerContext ctx,
GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (groupConfig == null) {
LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup());
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
Expand Down Expand Up @@ -2444,7 +2444,7 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
}
// groupSysFlag
if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (groupConfig != null) {
request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag()));
}
Expand Down Expand Up @@ -2933,7 +2933,7 @@ private RemotingCommand getTopicConfig(ChannelHandlerContext ctx,
GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (topicConfig == null) {
LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
//be care of the response code, should set "not-exist" explicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ private void scanGarbage() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic);
POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic);
iterator.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic);
iterator.remove();
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,7 @@ public void testAutoCleanAndEncode() {

SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
subscriptionGroupConfigConcurrentMap.put(GROUP, new SubscriptionGroupConfig());
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true);

TopicConfig topicConfig = new TopicConfig(TOPIC);
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
Expand Down

0 comments on commit 03cdcaa

Please sign in to comment.