diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 21f20dde325..57b62edbb0f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -28,9 +28,11 @@ import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -100,6 +102,55 @@ public void cleanOffsetByTopic(String topic) { } } + @Override + public boolean load() { + String lmqFilePath = this.configLmqFilePath(); + String normalFilePath = this.configFilePath(); + String targetFilePath = null; + try { + targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath); + String jsonString = MixAll.file2String(targetFilePath); + if (null == jsonString || jsonString.length() == 0) { + return this.loadBak(); + } else { + if (StringUtils.equals(normalFilePath, targetFilePath)) { + this.decode(jsonString); + } else { + this.decodeFromLmq(jsonString); + } + LOG.info("load " + targetFilePath + " success"); + return true; + } + } catch (Exception e) { + LOG.error("load " + targetFilePath + " failed, and try to load backup file", e); + return this.loadBak(); + } + } + + @Override + protected boolean loadBak() { + String lmqBakFilePath = this.configLmqFilePath() + ".bak"; + String normalBakFilePath = this.configFilePath() + ".bak"; + String targetBakFilePath = null; + try { + targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath); + String jsonString = MixAll.file2String(targetBakFilePath); + if (jsonString != null && jsonString.length() > 0) { + if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) { + this.decode(jsonString); + } else { + this.decodeFromLmq(jsonString); + } + LOG.info("load " + targetBakFilePath + " OK"); + return true; + } + } catch (Exception e) { + LOG.error("load " + targetBakFilePath + " Failed", e); + return false; + } + return true; + } + public void scanUnsubscribedTopic() { Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { @@ -290,6 +341,10 @@ public String configFilePath() { return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); } + private String configLmqFilePath() { + return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir()); + } + @Override public void decode(String jsonString) { if (jsonString != null) { @@ -301,6 +356,13 @@ public void decode(String jsonString) { } } + public void decodeFromLmq(String jsonString) { + LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class); + if (obj != null) { + this.offsetTable = obj.offsetTable; + } + } + @Override public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index ce70b1a820f..2487e32080f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -20,12 +20,18 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class LmqConsumerOffsetManager extends ConsumerOffsetManager { + protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private ConcurrentHashMap lmqOffsetTable = new ConcurrentHashMap<>(512); public LmqConsumerOffsetManager() { @@ -36,6 +42,60 @@ public LmqConsumerOffsetManager(BrokerController brokerController) { super(brokerController); } + @Override + public boolean load() { + String lmqFilePath = this.configFilePath(); + String normalFilePath = super.configFilePath(); + String targetFilePath = null; + try { + targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath); + String jsonString = MixAll.file2String(targetFilePath); + if (null == jsonString || jsonString.length() == 0) { + return this.loadBak(); + } else { + if (StringUtils.equals(normalFilePath, targetFilePath)) { + //should load the old lmq offset + String lmqJsonString = MixAll.file2String(lmqFilePath); + this.decode(lmqJsonString); + this.decodeFromLmq(jsonString); + } else { + this.decode(jsonString); + } + LOG.info("load " + targetFilePath + " success"); + return true; + } + } catch (Exception e) { + LOG.error("load " + targetFilePath + " failed, and try to load backup file", e); + return this.loadBak(); + } + } + + @Override + public boolean loadBak() { + String lmqBakFilePath = this.configFilePath() + ".bak"; + String normalBakFilePath = super.configFilePath() + ".bak"; + String targetBakFilePath = null; + try { + targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath); + String jsonString = MixAll.file2String(targetBakFilePath); + if (jsonString != null && jsonString.length() > 0) { + if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) { + String lmqJsonString = MixAll.file2String(lmqBakFilePath); + this.decode(lmqJsonString); + this.decodeFromLmq(jsonString); + } else { + this.decode(jsonString); + } + LOG.info("load " + targetBakFilePath + " OK"); + return true; + } + } catch (Exception e) { + LOG.error("load " + targetBakFilePath + " Failed", e); + return false; + } + return true; + } + @Override public long queryOffset(final String group, final String topic, final int queueId) { if (!MixAll.isLmq(group)) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java index 9626bcaaeeb..2252abba9ae 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java @@ -17,8 +17,13 @@ package org.apache.rocketmq.broker.offset; +import com.alibaba.fastjson.JSON; import java.io.File; +import java.io.FileWriter; +import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentMap; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; @@ -30,6 +35,7 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.mockito.Spy; @@ -99,6 +105,136 @@ public void testOffsetManage1() { assertThat(lmqConsumerOffsetManager1.getLmqOffsetTable().size()).isEqualTo(2); } + @Test + public void testUpgradeCompatible() throws IOException, InterruptedException { + //load old consumerOffset + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.commitOffset("127.0.0.1","GID_test2", "OldTopic",0, 11L); + String json = JSON.toJSONString(consumerOffsetManager); + String configFilePath = consumerOffsetManager.configFilePath(); + + persistOffsetFile(configFilePath, json); + ConcurrentMap> oldOffsetTable = consumerOffsetManager.getOffsetTable(); + String oldOffset = JSON.toJSONString(oldOffsetTable); + + //[UPGRADE]init lmq consumer offset manager + LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController); + lmqConsumerOffsetManager.load(); + ConcurrentMap> offsetTable = lmqConsumerOffsetManager.getOffsetTable(); + String newOffsetJson = JSON.toJSONString(offsetTable); + Assert.assertEquals(oldOffset, newOffsetJson); + } + + @Test + public void testRollBack() throws IOException, InterruptedException { + //generate lmq offset + LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 1, 12L); + String json = JSON.toJSONString(lmqConsumerOffsetManager); + String configFilePath = lmqConsumerOffsetManager.configFilePath(); + persistOffsetFile(configFilePath, json); + Thread.sleep(1000); + + //init consumerOffsetManager + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "NewTopic", 0, 12L); + String offsetJson = JSON.toJSONString(consumerOffsetManager); + String filePath = consumerOffsetManager.configFilePath(); + persistOffsetFile(filePath, offsetJson); + String offsetTableOld = JSON.toJSONString(consumerOffsetManager.getOffsetTable()); + + //clear old offset info + consumerOffsetManager.getOffsetTable().clear(); + Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size()); + consumerOffsetManager.load(); + Assert.assertEquals(1, consumerOffsetManager.getOffsetTable().size()); + + String offsetTableNew = JSON.toJSONString(consumerOffsetManager.getOffsetTable()); + Assert.assertEquals(offsetTableOld, offsetTableNew); + Thread.sleep(1000); + + //roll back and lmqOffset is modified resently + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L); + String newLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager); + persistOffsetFile(configFilePath, newLmqOffsetJson); + String expectOffset = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable()); + + //clear old offset info + consumerOffsetManager.getOffsetTable().clear(); + Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size()); + consumerOffsetManager.load(); + Assert.assertEquals(2, consumerOffsetManager.getOffsetTable().size()); + String actualOffset = JSON.toJSONString(consumerOffsetManager.getOffsetTable()); + Assert.assertEquals(expectOffset, actualOffset); + } + + @Test + public void testUpgradeWithOldAndNewFile() throws IOException { + //generate normal offset table + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L); + String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager); + String consumerOffsetConfigPath = consumerOffsetManager.configFilePath(); + persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson); + + //generate old lmq offset table + LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L); + String expectLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable()); + String lmqConfigPath = lmqConsumerOffsetManager.configFilePath(); + String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager); + persistOffsetFile(lmqConfigPath, lmqManagerJson); + + //reset lmq offset table + lmqConsumerOffsetManager.getOffsetTable().clear(); + Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size()); + lmqConsumerOffsetManager.load(); + Assert.assertEquals(2, lmqConsumerOffsetManager.getOffsetTable().size()); + String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable()); + Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson); + } + + @Test + public void testLoadBakCompatible() throws IOException, InterruptedException { + //generate normal offset table + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L); + String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager); + String consumerOffsetConfigPath = consumerOffsetManager.configFilePath() + ".bak"; + + //generate old lmq offset table + LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L); + lmqConsumerOffsetManager.commitOffset("127.0.0.1", "%LMQ%GID_test222", "%LMQ%1222", 0, 10L); + String lmqConfigPath = lmqConsumerOffsetManager.configFilePath() + ".bak"; + String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager); + persistOffsetFile(lmqConfigPath, lmqManagerJson); + Thread.sleep(1000); + persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson); + String expectLmqOffsetJson = JSON.toJSONString(consumerOffsetManager.getOffsetTable()); + + //reset lmq offset table + lmqConsumerOffsetManager.getOffsetTable().clear(); + lmqConsumerOffsetManager.getLmqOffsetTable().clear(); + Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size()); + lmqConsumerOffsetManager.loadBak(); + Assert.assertEquals(1, lmqConsumerOffsetManager.getOffsetTable().size()); + Assert.assertEquals(1, lmqConsumerOffsetManager.getLmqOffsetTable().size()); + String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable()); + Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson); + } + + public void persistOffsetFile(String filePath, String detail) throws IOException { + File file = new File(filePath); + boolean mkdirs = file.getParentFile().mkdirs(); + FileWriter writer = new FileWriter(file); + writer.write(detail); + writer.close(); + } + + @After public void destroy() { UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir())); diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 6c3bed47cf3..4d34937bbbb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -48,7 +48,7 @@ public boolean load() { } } - private boolean loadBak() { + protected boolean loadBak() { String fileName = null; try { fileName = this.configFilePath(); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 1233a54223b..c13918deac6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -32,8 +32,11 @@ import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -518,4 +521,27 @@ public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup) } return false; } + + public static String loadLatestFile(String newConfigPath, String oldConfigPath) { + String targetConfigPath = + getFileModificationTime(newConfigPath) > getFileModificationTime(oldConfigPath) + ? newConfigPath + : oldConfigPath; + log.info("load target file path {}", targetConfigPath); + return targetConfigPath; + } + + public static long getFileModificationTime(String filePath) { + Path path = FileSystems.getDefault().getPath(filePath); + + try { + BasicFileAttributes attributes = Files.readAttributes(path, BasicFileAttributes.class); + long modificationTime = attributes.lastModifiedTime().toMillis(); + log.info("{} modification time is {}", filePath, modificationTime); + return modificationTime; + } catch (IOException e) { + log.warn("get file modification time failed", e); + } + return -1; + } }