Skip to content

Commit

Permalink
end if msg null
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Dec 3, 2024
1 parent 34990eb commit 4b57746
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface TCCRocketMQ {
* @throws InterruptedException
*/
boolean commit(BusinessActionContext context)
throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException, TimeoutException;
throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException;

/**
* RocketMQ half send rollback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.seata.integration.rocketmq;

import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.rm.tcc.api.BusinessActionContext;
import org.apache.seata.rm.tcc.api.BusinessActionContextUtil;
Expand Down Expand Up @@ -70,10 +71,10 @@ public SendResult prepare(Message message, long timeout) throws MQClientExceptio

@Override
public boolean commit(BusinessActionContext context)
throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TimeoutException, TransactionException {
throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException {
Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class);
SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class);
if (message == null || sendResult == null) {
if (checkMqStatus(message, sendResult)) {
throw new TransactionException("TCCRocketMQ commit but cannot find message and sendResult");
}
this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.COMMIT_MESSAGE, null);
Expand All @@ -86,11 +87,17 @@ public boolean rollback(BusinessActionContext context)
throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException {
Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class);
SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class);
if (message == null || sendResult == null) {
if (checkMqStatus(message, sendResult)) {
LOGGER.error("TCCRocketMQ rollback but cannot find message and sendResult");
return true;
}
this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null);
LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId());
return true;
}

private static boolean checkMqStatus(Message message, SendResult sendResult) {
return message == null || sendResult == null ||
(StringUtils.isBlank(sendResult.getOffsetMsgId()) && StringUtils.isBlank(sendResult.getMsgId()));
}
}

0 comments on commit 4b57746

Please sign in to comment.