Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7087] fix:consumer offset compatible problem caused by LmqConsumerOffsetManager #7090

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import com.google.common.base.Strings;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -100,6 +102,55 @@ public void cleanOffsetByTopic(String topic) {
}
}

@Override
public boolean load() {
String lmqFilePath = this.configLmqFilePath();
String normalFilePath = this.configFilePath();
String targetFilePath = null;
try {
targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath);
String jsonString = MixAll.file2String(targetFilePath);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
if (StringUtils.equals(normalFilePath, targetFilePath)) {
this.decode(jsonString);
} else {
this.decodeFromLmq(jsonString);
}
LOG.info("load " + targetFilePath + " success");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetFilePath + " failed, and try to load backup file", e);
return this.loadBak();
}
}

@Override
protected boolean loadBak() {
String lmqBakFilePath = this.configLmqFilePath() + ".bak";
String normalBakFilePath = this.configFilePath() + ".bak";
String targetBakFilePath = null;
try {
targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath);
String jsonString = MixAll.file2String(targetBakFilePath);
if (jsonString != null && jsonString.length() > 0) {
if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) {
this.decode(jsonString);
} else {
this.decodeFromLmq(jsonString);
}
LOG.info("load " + targetBakFilePath + " OK");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetBakFilePath + " Failed", e);
return false;
}
return true;
}

public void scanUnsubscribedTopic() {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down Expand Up @@ -290,6 +341,10 @@ public String configFilePath() {
return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}

private String configLmqFilePath() {
return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir());
}

@Override
public void decode(String jsonString) {
if (jsonString != null) {
Expand All @@ -301,6 +356,13 @@ public void decode(String jsonString) {
}
}

public void decodeFromLmq(String jsonString) {
LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}

@Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public LmqConsumerOffsetManager() {
Expand All @@ -36,6 +42,60 @@ public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public boolean load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, compatibility in the ConfigManager would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compatibility logic can not remove to parent class, because it involves many different implementation methods, such as configFilePath, which can not obtain in parent class.

String lmqFilePath = this.configFilePath();
String normalFilePath = super.configFilePath();
String targetFilePath = null;
try {
targetFilePath = MixAll.loadLatestFile(normalFilePath, lmqFilePath);
String jsonString = MixAll.file2String(targetFilePath);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
if (StringUtils.equals(normalFilePath, targetFilePath)) {
//should load the old lmq offset
String lmqJsonString = MixAll.file2String(lmqFilePath);
this.decode(lmqJsonString);
this.decodeFromLmq(jsonString);
} else {
this.decode(jsonString);
}
LOG.info("load " + targetFilePath + " success");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetFilePath + " failed, and try to load backup file", e);
return this.loadBak();
}
}

@Override
public boolean loadBak() {
String lmqBakFilePath = this.configFilePath() + ".bak";
String normalBakFilePath = super.configFilePath() + ".bak";
String targetBakFilePath = null;
try {
targetBakFilePath = MixAll.loadLatestFile(normalBakFilePath, lmqBakFilePath);
String jsonString = MixAll.file2String(targetBakFilePath);
if (jsonString != null && jsonString.length() > 0) {
if (StringUtils.equals(normalBakFilePath, targetBakFilePath)) {
String lmqJsonString = MixAll.file2String(lmqBakFilePath);
this.decode(lmqJsonString);
this.decodeFromLmq(jsonString);
} else {
this.decode(jsonString);
}
LOG.info("load " + targetBakFilePath + " OK");
return true;
}
} catch (Exception e) {
LOG.error("load " + targetBakFilePath + " Failed", e);
return false;
}
return true;
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

package org.apache.rocketmq.broker.offset;

import com.alibaba.fastjson.JSON;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
Expand All @@ -30,6 +35,7 @@
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Spy;

Expand Down Expand Up @@ -99,6 +105,136 @@ public void testOffsetManage1() {
assertThat(lmqConsumerOffsetManager1.getLmqOffsetTable().size()).isEqualTo(2);
}

@Test
public void testUpgradeCompatible() throws IOException, InterruptedException {
//load old consumerOffset
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1","GID_test2", "OldTopic",0, 11L);
String json = JSON.toJSONString(consumerOffsetManager);
String configFilePath = consumerOffsetManager.configFilePath();

persistOffsetFile(configFilePath, json);
ConcurrentMap<String, ConcurrentMap<Integer, Long>> oldOffsetTable = consumerOffsetManager.getOffsetTable();
String oldOffset = JSON.toJSONString(oldOffsetTable);

//[UPGRADE]init lmq consumer offset manager
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.load();
ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = lmqConsumerOffsetManager.getOffsetTable();
String newOffsetJson = JSON.toJSONString(offsetTable);
Assert.assertEquals(oldOffset, newOffsetJson);
}

@Test
public void testRollBack() throws IOException, InterruptedException {
//generate lmq offset
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 1, 12L);
String json = JSON.toJSONString(lmqConsumerOffsetManager);
String configFilePath = lmqConsumerOffsetManager.configFilePath();
persistOffsetFile(configFilePath, json);
Thread.sleep(1000);

//init consumerOffsetManager
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "NewTopic", 0, 12L);
String offsetJson = JSON.toJSONString(consumerOffsetManager);
String filePath = consumerOffsetManager.configFilePath();
persistOffsetFile(filePath, offsetJson);
String offsetTableOld = JSON.toJSONString(consumerOffsetManager.getOffsetTable());

//clear old offset info
consumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size());
consumerOffsetManager.load();
Assert.assertEquals(1, consumerOffsetManager.getOffsetTable().size());

String offsetTableNew = JSON.toJSONString(consumerOffsetManager.getOffsetTable());
Assert.assertEquals(offsetTableOld, offsetTableNew);
Thread.sleep(1000);

//roll back and lmqOffset is modified resently
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
String newLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(configFilePath, newLmqOffsetJson);
String expectOffset = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());

//clear old offset info
consumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, consumerOffsetManager.getOffsetTable().size());
consumerOffsetManager.load();
Assert.assertEquals(2, consumerOffsetManager.getOffsetTable().size());
String actualOffset = JSON.toJSONString(consumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectOffset, actualOffset);
}

@Test
public void testUpgradeWithOldAndNewFile() throws IOException {
//generate normal offset table
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L);
String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager);
String consumerOffsetConfigPath = consumerOffsetManager.configFilePath();
persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson);

//generate old lmq offset table
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
String expectLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
String lmqConfigPath = lmqConsumerOffsetManager.configFilePath();
String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(lmqConfigPath, lmqManagerJson);

//reset lmq offset table
lmqConsumerOffsetManager.getOffsetTable().clear();
Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size());
lmqConsumerOffsetManager.load();
Assert.assertEquals(2, lmqConsumerOffsetManager.getOffsetTable().size());
String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson);
}

@Test
public void testLoadBakCompatible() throws IOException, InterruptedException {
//generate normal offset table
ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
consumerOffsetManager.commitOffset("127.0.0.1", "GID_test3", "NormalTopic", 0, 10L);
String consumerOffsetManagerJson = JSON.toJSONString(consumerOffsetManager);
String consumerOffsetConfigPath = consumerOffsetManager.configFilePath() + ".bak";

//generate old lmq offset table
LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test1", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "GID_test2", "OldTopic", 0, 10L);
lmqConsumerOffsetManager.commitOffset("127.0.0.1", "%LMQ%GID_test222", "%LMQ%1222", 0, 10L);
String lmqConfigPath = lmqConsumerOffsetManager.configFilePath() + ".bak";
String lmqManagerJson = JSON.toJSONString(lmqConsumerOffsetManager);
persistOffsetFile(lmqConfigPath, lmqManagerJson);
Thread.sleep(1000);
persistOffsetFile(consumerOffsetConfigPath, consumerOffsetManagerJson);
String expectLmqOffsetJson = JSON.toJSONString(consumerOffsetManager.getOffsetTable());

//reset lmq offset table
lmqConsumerOffsetManager.getOffsetTable().clear();
lmqConsumerOffsetManager.getLmqOffsetTable().clear();
Assert.assertEquals(0, lmqConsumerOffsetManager.getOffsetTable().size());
lmqConsumerOffsetManager.loadBak();
Assert.assertEquals(1, lmqConsumerOffsetManager.getOffsetTable().size());
Assert.assertEquals(1, lmqConsumerOffsetManager.getLmqOffsetTable().size());
String actualLmqOffsetJson = JSON.toJSONString(lmqConsumerOffsetManager.getOffsetTable());
Assert.assertEquals(expectLmqOffsetJson, actualLmqOffsetJson);
}

public void persistOffsetFile(String filePath, String detail) throws IOException {
File file = new File(filePath);
boolean mkdirs = file.getParentFile().mkdirs();
FileWriter writer = new FileWriter(file);
writer.write(detail);
writer.close();
}


@After
public void destroy() {
UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public boolean load() {
}
}

private boolean loadBak() {
protected boolean loadBak() {
String fileName = null;
try {
fileName = this.configFilePath();
Expand Down
26 changes: 26 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand Down Expand Up @@ -518,4 +521,27 @@ public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup)
}
return false;
}

public static String loadLatestFile(String newConfigPath, String oldConfigPath) {
String targetConfigPath =
getFileModificationTime(newConfigPath) > getFileModificationTime(oldConfigPath)
? newConfigPath
: oldConfigPath;
log.info("load target file path {}", targetConfigPath);
return targetConfigPath;
}

public static long getFileModificationTime(String filePath) {
Path path = FileSystems.getDefault().getPath(filePath);

try {
BasicFileAttributes attributes = Files.readAttributes(path, BasicFileAttributes.class);
long modificationTime = attributes.lastModifiedTime().toMillis();
log.info("{} modification time is {}", filePath, modificationTime);
return modificationTime;
} catch (IOException e) {
log.warn("get file modification time failed", e);
}
return -1;
}
}
Loading