Skip to content

Commit

Permalink
optimize: splitting MergedWarpMessage enhances the server parallel pr…
Browse files Browse the repository at this point in the history
…ocessing capability (apache#6807)
  • Loading branch information
funky-eyes authored Oct 31, 2024
1 parent bf0d11a commit 5d2b139
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 50 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] upgrade npmjs version in saga module
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] fix log argument mismatch issue
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] optimize readme docs
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] splitting MergedWarpMessage enhances the server parallel processing capability
- [[#6905](https://github.com/apache/incubator-seata/pull/6905)] remove incompatible licenses at build time
- [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2 dependency adds test scope
- [[#6911](https://github.com/apache/incubator-seata/pull/6911)] fix some typos in project
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] 修复日志参数不匹配问题
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] 升级 saga 模块 npmjs 版本
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] 优化 readme 文档
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] 分离merge消息使其能完全并行处理
- [[#6905](https://github.com/apache/incubator-seata/pull/6905)] 移除构建期不兼容的 license
- [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2依赖添加test scope
- [[#6911](https://github.com/apache/incubator-seata/pull/6911)] 修正项目中的部分拼写错误
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/java/org/apache/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class Version {
private static final String CURRENT = VersionInfo.VERSION;
private static final String VERSION_0_7_1 = "0.7.1";
private static final String VERSION_1_5_0 = "1.5.0";
private static final String VERSION_2_3_0 = "2.3.0";
private static final int MAX_VERSION_DOT = 3;

/**
Expand Down Expand Up @@ -86,15 +87,21 @@ public static String getChannelVersion(Channel c) {
* @return true: client version is above or equal version 1.5.0, false: on the contrary
*/
public static boolean isAboveOrEqualVersion150(String version) {
boolean isAboveOrEqualVersion150 = false;
return isAboveOrEqualVersion(version, VERSION_1_5_0);
}

public static boolean isAboveOrEqualVersion230(String version) {
return isAboveOrEqualVersion(version, VERSION_2_3_0);
}

public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) {
boolean isAboveOrEqualVersion = false;
try {
long clientVersion = convertVersion(version);
long divideVersion = convertVersion(VERSION_1_5_0);
isAboveOrEqualVersion150 = clientVersion >= divideVersion;
isAboveOrEqualVersion = convertVersion(clientVersion) >= convertVersion(divideVersion);
} catch (Exception e) {
LOGGER.error("convert version error, clientVersion:{}", version, e);
LOGGER.error("convert version error, clientVersion:{}", clientVersion, e);
}
return isAboveOrEqualVersion150;
return isAboveOrEqualVersion;
}

public static long convertVersion(String version) throws IncompatibleVersionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
*/
protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();

protected final Map<Integer, Integer> childToParentMap = new ConcurrentHashMap<>();

/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable}
Expand Down Expand Up @@ -203,8 +205,15 @@ public void sendAsyncRequest(Channel channel, Object msg) {
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
if (rpcMessage.getBody() instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
Object body = rpcMessage.getBody();
if (body instanceof MergeMessage) {
Integer parentId = rpcMessage.getId();
mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
if (body instanceof MergedWarpMessage) {
for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
childToParentMap.put(msgId, parentId);
}
}
}
super.sendAsync(channel, rpcMessage);
}
Expand Down Expand Up @@ -370,6 +379,10 @@ public void run() {
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
Integer parentId = childToParentMap.remove(msgId);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.processor.Pair;
Expand Down Expand Up @@ -270,4 +274,32 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep
}

}

@Override
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s mergeMsgMap not being cleared
if (body instanceof MergedWarpMessage && (StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) {
MergedWarpMessage mergedWarpMessage = (MergedWarpMessage)body;
for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) {
RpcMessage rpcMsg =
buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i));
super.processMessage(ctx, rpcMsg);
}
} else {
super.processMessage(ctx, rpcMessage);
}
}

private RpcMessage buildRequestMessage(AbstractMessage msg, RpcMessage rpcMessage,int id) {
RpcMessage rpcMsg = new RpcMessage();
rpcMsg.setId(id);
rpcMsg.setCodec(rpcMessage.getCodec());
rpcMsg.setCompressor(rpcMessage.getCompressor());
rpcMsg.setBody(msg);
return rpcMsg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer {

private final AtomicBoolean initialized = new AtomicBoolean(false);

private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void registerProcessor() {
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.util.StringUtils.isNotBlank;

/**
* The rm netty client.
*
Expand Down Expand Up @@ -187,7 +189,7 @@ public void init() {
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
if (isNotBlank(transactionServiceGroup)) {
initConnection();
}
}
Expand Down Expand Up @@ -247,7 +249,7 @@ protected Function<String, NettyPoolKey> getPoolKeyFunction() {
private void registerProcessor() {
// 1.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
/**
* The Merge msg map from org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMap.
*/
private Map<Integer, MergeMessage> mergeMsgMap;
private final Map<Integer, MergeMessage> mergeMsgMap;

private final Map<Integer, Integer> childToParentMap;

/**
* The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures
Expand All @@ -82,9 +84,10 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
private final TransactionMessageHandler transactionMessageHandler;

public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,
ConcurrentHashMap<Integer, MessageFuture> futures,
ConcurrentHashMap<Integer, MessageFuture> futures, Map<Integer,Integer> childToParentMap,
TransactionMessageHandler transactionMessageHandler) {
this.mergeMsgMap = mergeMsgMap;
this.childToParentMap = childToParentMap;
this.futures = futures;
this.transactionMessageHandler = transactionMessageHandler;
}
Expand All @@ -97,40 +100,52 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId from the childToParentMap.
childToParentMap.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
} else {
future.setResultMessage(results.getMsgs()[i]);
}
}
} else if (rpcMessage.getBody() instanceof BatchResultMessage) {
try {
BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
BatchResultMessage batchResultMessage = (BatchResultMessage)rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId
// from the childToParentMap.
Integer parentId = childToParentMap.remove(msgId);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,
batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
} finally {
// In order to be compatible with the old version, in the batch sending of version 1.5.0,
// batch messages will also be placed in the local cache of mergeMsgMap,
// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
mergeMsgMap.clear();
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
Integer id = rpcMessage.getId();
try {
MessageFuture messageFuture = futures.remove(id);
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage)rpcMessage.getBody(), null);
}
}
}
} finally {
// In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage
// so it is necessary to clear childToParentMap and mergeMsgMap here.
Integer parentId = childToParentMap.remove(id);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Map;


@EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true")
@SpringBootTest
public class RedisVGroupMappingStoreManagerTest {
private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager;
Expand Down
Loading

0 comments on commit 5d2b139

Please sign in to comment.