Skip to content

Commit

Permalink
fix:consumer offset compatible problem caused by LmqConsumerOffsetMan…
Browse files Browse the repository at this point in the history
…ager
  • Loading branch information
humkum committed Oct 9, 2023
1 parent 3808387 commit 80f8fa3
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import com.google.common.base.Strings;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -100,6 +102,55 @@ public void cleanOffsetByTopic(String topic) {
}
}

@Override
public boolean load() {
String lmqFilePath = this.configLmqFilePath();
String normalFilePath = this.configFilePath();
String targetFilePath = null;
try {
targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath);
String jsonString = MixAll.file2String(targetFilePath);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
if (StringUtils.equals(normalFilePath, targetFilePath)) {
this.decode(jsonString);
} else {
this.decodeFromLmq(jsonString);
}
LOG.info("load " + targetFilePath + " success");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetFilePath + " failed, and try to load backup file", e);
return this.loadBak();
}
}

@Override
protected boolean loadBak() {
String lmqBakFilePath = this.configLmqFilePath() + ".bak";
String normalBakFilePath = this.configFilePath() + ".bak";
String targetBakFilePath = null;
try {
targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath);
String jsonString = MixAll.file2String(targetBakFilePath);
if (jsonString != null && jsonString.length() > 0) {
if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) {
this.decode(jsonString);
} else {
this.decodeFromLmq(jsonString);
}
LOG.info("load " + targetBakFilePath + " OK");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetBakFilePath + " Failed", e);
return false;
}
return true;
}

public void scanUnsubscribedTopic() {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down Expand Up @@ -290,6 +341,10 @@ public String configFilePath() {
return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}

private String configLmqFilePath() {
return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir());
}

@Override
public void decode(String jsonString) {
if (jsonString != null) {
Expand All @@ -301,6 +356,13 @@ public void decode(String jsonString) {
}
}

public void decodeFromLmq(String jsonString) {
LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}

@Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public LmqConsumerOffsetManager() {
Expand All @@ -36,6 +42,60 @@ public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public boolean load() {
String lmqFilePath = this.configFilePath();
String normalFilePath = super.configFilePath();
String targetFilePath = null;
try {
targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath);
String jsonString = MixAll.file2String(targetFilePath);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
if (StringUtils.equals(normalFilePath, targetFilePath)) {
//should load the old lmq offset
String lmqJsonString = MixAll.file2String(lmqFilePath);
this.decode(lmqJsonString);
this.decodeFromLmq(jsonString);
} else {
this.decode(jsonString);
}
LOG.info("load " + targetFilePath + " success");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetFilePath + " failed, and try to load backup file", e);
return this.loadBak();
}
}

@Override
public boolean loadBak() {
String lmqBakFilePath = this.configFilePath() + ".bak";
String normalBakFilePath = super.configFilePath() + ".bak";
String targetBakFilePath = null;
try {
targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath);
String jsonString = MixAll.file2String(targetBakFilePath);
if (jsonString != null && jsonString.length() > 0) {
if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) {
String lmqJsonString = MixAll.file2String(lmqBakFilePath);
this.decode(lmqJsonString);
this.decodeFromLmq(jsonString);
} else {
this.decode(jsonString);
}
LOG.info("load " + targetBakFilePath + " OK");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetBakFilePath + " Failed", e);
return false;
}
return true;
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

package org.apache.rocketmq.broker.offset;

import com.alibaba.fastjson.JSON;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
Expand All @@ -30,6 +35,7 @@
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Spy;

Expand Down Expand Up @@ -99,6 +105,136 @@ public void testOffsetManage1() {
assertThat(lmqConsumerOffsetManager1.getLmqOffsetTable().size()).isEqualTo(2);
}

@Test
public void testUpgradeCompatible() throws IOException, InterruptedException {
//load old consumerOffset
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1","GID_test2", "OldTopic",0, 11L);
String json = JSON.toJSONString(consumerOffsetManager);
String configFilePath = consumerOffsetManager.configFilePath();

persistOffsetFile(configFilePath, json);
ConcurrentMap<String, ConcurrentMap<Integer, Long>> oldOffsetTable = consumerOffsetManager.getOffsetTable();
String oldOffset = JSON.toJSONString(oldOffsetTable);

//[UPGRADE]init lmq consumer offset manager
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.load();
ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = lmqConsumerOffsetManager.getOffsetTable();
String newOffsetJson = JSON.toJSONString(offsetTable);
Assert.assertEquals(oldOffset, newOffsetJson);
}

@Test
public void testRollBack() throws IOException, InterruptedException {
//generate lmq offset
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 1, 12L);
String json = JSON.toJSONString(lmqConsumerOffsetManager);
String configFilePath = lmqConsumerOffsetManager.configFilePath();
persistOffsetFile(configFilePath, json);
Thread.sleep(1000);

//init consumerOffsetManager
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "NewTopic", 0, 12L);
String offsetJson = JSON.toJSONString(consumerOffsetManager);
String filePath = consumerOffsetManager.configFilePath();
persistOffsetFile(filePath, offsetJson);
String offsetTableOld = JSON.toJSONString(consumerOffsetManager.getOffsetTable());

//clear old offset info
consumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size());
consumerOffsetManager.load();
Assert.assertEquals(1, consumerOffsetManager.getOffsetTable().size());

String offsetTableNew = JSON.toJSONString(consumerOffsetManager.getOffsetTable());
Assert.assertEquals(offsetTableOld, offsetTableNew);
Thread.sleep(1000);

//roll back and lmqOffset is modified resently
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
String newLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(configFilePath, newLmqOffsetJson);
String expectOffset = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());

//clear old offset info
consumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size());
consumerOffsetManager.load();
Assert.assertEquals(2, consumerOffsetManager.getOffsetTable().size());
String actualOffset = JSON.toJSONString(consumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectOffset, actualOffset);
}

@Test
public void testUpgradeWithOldAndNewFile() throws IOException {
//generate normal offset table
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L);
String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager);
String consumerOffsetConfigPath = consumerOffsetManager.configFilePath();
persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson);

//generate old lmq offset table
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
String expectLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
String lmqConfigPath = lmqConsumerOffsetManager.configFilePath();
String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(lmqConfigPath, lmqManagerJson);

//reset lmq offset table
lmqConsumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size());
lmqConsumerOffsetManager.load();
Assert.assertEquals(2, lmqConsumerOffsetManager.getOffsetTable().size());
String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson);
}

@Test
public void testLoadBakCompatible() throws IOException, InterruptedException {
//generate normal offset table
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L);
String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager);
String consumerOffsetConfigPath = consumerOffsetManager.configFilePath() + ".bak";

//generate old lmq offset table
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "%LMQ%GID_test222", "%LMQ%1222", 0, 10L);
String lmqConfigPath = lmqConsumerOffsetManager.configFilePath() + ".bak";
String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(lmqConfigPath, lmqManagerJson);
Thread.sleep(1000);
persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson);
String expectLmqOffsetJson = JSON.toJSONString(consumerOffsetManager.getOffsetTable());

//reset lmq offset table
lmqConsumerOffsetManager.getOffsetTable().clear();
lmqConsumerOffsetManager.getLmqOffsetTable().clear();
Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size());
lmqConsumerOffsetManager.loadBak();
Assert.assertEquals(1, lmqConsumerOffsetManager.getOffsetTable().size());
Assert.assertEquals(1, lmqConsumerOffsetManager.getLmqOffsetTable().size());
String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson);
}

public void persistOffsetFile(String filePath, String detail) throws IOException {
File file = new File(filePath);
boolean mkdirs = file.getParentFile().mkdirs();
FileWriter writer = new FileWriter(file);
writer.write(detail);
writer.close();
}


@After
public void destroy() {
UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public boolean load() {
}
}

private boolean loadBak() {
protected boolean loadBak() {
String fileName = null;
try {
fileName = this.configFilePath();
Expand Down
26 changes: 26 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand Down Expand Up @@ -518,4 +521,27 @@ public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup)
}
return false;
}

public static String loadLatestFile(String newConfigPath, String oldConfigPath) {
String targetConfigPath =
getFileModificationTime(newConfigPath) > getFileModificationTime(oldConfigPath)
? newConfigPath
: oldConfigPath;
log.info("load target file path {}", targetConfigPath);
return targetConfigPath;
}

public static long getFileModificationTime(String filePath) {
Path path = FileSystems.getDefault().getPath(filePath);

try {
BasicFileAttributes attributes = Files.readAttributes(path, BasicFileAttributes.class);
long modificationTime = attributes.lastModifiedTime().toMillis();
log.info("{} modification time is {}", filePath, modificationTime);
return modificationTime;
} catch (IOException e) {
log.warn("get file modification time failed", e);
}
return -1;
}
}

0 comments on commit 80f8fa3

Please sign in to comment.