diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 9f5c0cec22e..6e97f629775 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -24,7 +24,11 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6947](https://github.com/apache/incubator-seata/pull/6947)] fix npe for nacos registry when look up address - [[#6984](https://github.com/apache/incubator-seata/pull/6984)] support building docker image on openjdk23 - [[#6994](https://github.com/apache/incubator-seata/pull/6994)] fix the problem of building undoLog exception when update join does not update data - +- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] fix the Raft NPE issue caused by two-phase concurrency +- [[#7010](https://github.com/apache/incubator-seata/pull/7010)] fix error while the "context" is key word in DM8 when delete undolog +- [[#7022](https://github.com/apache/incubator-seata/pull/7022)] fix `store.mode` property in `application.raft.example.yml` +- [[#7025](https://github.com/apache/incubator-seata/pull/7025)] fix vGroupMappingManager is NOT init +- ### optimize: - [[#6826](https://github.com/apache/incubator-seata/pull/6826)] remove the branch registration operation of the XA read-only transaction - [[#6874](https://github.com/apache/incubator-seata/pull/6874)] modify the version to 2.3.0-SNAPSHOT @@ -46,13 +50,15 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6991](https://github.com/apache/incubator-seata/pull/6991)] gRPC serialization default to Protobuf - [[#6993](https://github.com/apache/incubator-seata/pull/6993)] optimize transaction metrics - [[#6995](https://github.com/apache/incubator-seata/pull/6995)] upgrade outdate npmjs dependencies - +- [[#6996](https://github.com/apache/incubator-seata/pull/6996)] optimize lock release logic in AT transaction mode +- [[#7023](https://github.com/apache/incubator-seata/pull/7023)] optimize fail fast, when all server not available ### refactor: ### test: - [[#6927](https://github.com/apache/incubator-seata/pull/6927)] Add unit tests for the `seata-rocketmq` module +- [[#7018](https://github.com/apache/incubator-seata/pull/7018)] Add unit tests for the `seata-tm` module Thanks to these contributors for their code commits. Please report an unintended omission. @@ -71,6 +77,9 @@ Thanks to these contributors for their code commits. Please report an unintended - [xingfudeshi](https://github.com/xingfudeshi) - [o-jimin](https://github.com/o-jimin) - [lixingjia77](https://github.com/lixingjia77) +- [whaon](https://github.com/whaon) +- [YvCeung](https://github.com/YvCeung) +- [jsbxyyx](https://github.com/jsbxyyx) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0c2069749c6..b6b2262c574 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -24,7 +24,10 @@ - [[#6947](https://github.com/apache/incubator-seata/pull/6947)] 修复nacos注册中心查询可用地址时的空指针问题 - [[#6984](https://github.com/apache/incubator-seata/pull/6984)] 修复 openjdk23 版本下无法构建 docker 镜像的问题 - [[#6994](https://github.com/apache/incubator-seata/pull/6994)] 修复updateJoin语句未更新到数据时prepareUndoLog异常 - +- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] 修复Raft模式下两阶段并发可能导致NPE的问题 +- [[#7010](https://github.com/apache/incubator-seata/pull/7010)] 修复使用达梦数据库时删除undolog发生SQL语法错误 +- [[#7022](https://github.com/apache/incubator-seata/pull/7022)] 修复 `application.raft.example.yml`的 `store.mode`属性 +- [[#7025](https://github.com/apache/incubator-seata/pull/7025)] 修复vGroupMappingManager未初始化的问题 ### optimize: - [[#6826](https://github.com/apache/incubator-seata/pull/6826)] 移除只读XA事务的分支注册操作 @@ -47,7 +50,11 @@ - [[#6950](https://github.com/apache/incubator-seata/pull/6950)] 移除JVM参数app.id - [[#6959](https://github.com/apache/incubator-seata/pull/6959)] 修正 `seata-http-jakarta`的模块命名和描述 - [[#6991](https://github.com/apache/incubator-seata/pull/6991)] gRPC协议序列化默认值为protobuf +- [[#6996](https://github.com/apache/incubator-seata/pull/6996)] 优化 AT 事务模式锁释放逻辑 - [[#6993](https://github.com/apache/incubator-seata/pull/6993)] 优化 metrics 指标 +- [[#6995](https://github.com/apache/incubator-seata/pull/6995)] 升级过时的 npmjs 依赖 +- [[#6996](https://github.com/apache/incubator-seata/pull/6996)] 优化 AT 事务模式锁释放逻辑 +- [[#7023](https://github.com/apache/incubator-seata/pull/7023)] 优化快速失败 ### refactor: @@ -56,6 +63,8 @@ ### test: - [[#6927](https://github.com/apache/incubator-seata/pull/6927)] 增加`seata-rocketmq`模块的测试用例 +- [[#7018](https://github.com/apache/incubator-seata/pull/7018)] 增加 `seata-tm` 模块的测试用例 + 非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。 @@ -74,6 +83,9 @@ - [xingfudeshi](https://github.com/xingfudeshi) - [o-jimin](https://github.com/o-jimin) - [lixingjia77](https://github.com/lixingjia77) +- [whaon](https://github.com/whaon) +- [YvCeung](https://github.com/YvCeung) +- [jsbxyyx](https://github.com/jsbxyyx) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index ff8436b6dc9..ef7304b1623 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -504,9 +504,15 @@ public interface ConfigurationKeys { /** * The constant ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE. + * This configuration is deprecated, please use {@link #ROLLBACK_FAILED_UNLOCK_ENABLE} instead. */ + @Deprecated String ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = SERVER_PREFIX + "rollbackRetryTimeoutUnlockEnable"; + /** + * The constant ROLLBACK_FAILED_UNLOCK_ENABLE. + */ + String ROLLBACK_FAILED_UNLOCK_ENABLE = SERVER_PREFIX + "rollbackFailedUnlockEnable"; /** * the constant RETRY_DEAD_THRESHOLD */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index eb0d40bb308..68d06a76ff8 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -18,56 +18,160 @@ import java.time.Duration; +/** + * The interface Default values. + */ public interface DefaultValues { + /** + * The constant DEFAULT_CLIENT_LOCK_RETRY_INTERVAL. + */ int DEFAULT_CLIENT_LOCK_RETRY_INTERVAL = 10; + /** + * The constant DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES. + */ int DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES = 10; + /** + * The constant DEFAULT_CLIENT_LOCK_RETRY_TIMES. + */ int DEFAULT_CLIENT_LOCK_RETRY_TIMES = 30; + /** + * The constant DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT. + */ boolean DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = true; + /** + * The constant DEFAULT_LOG_EXCEPTION_RATE. + */ int DEFAULT_LOG_EXCEPTION_RATE = 100; + /** + * The constant DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT. + */ int DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT = 10000; + /** + * The constant DEFAULT_TM_DEGRADE_CHECK_PERIOD. + */ int DEFAULT_TM_DEGRADE_CHECK_PERIOD = 2000; + /** + * The constant DEFAULT_CLIENT_REPORT_RETRY_COUNT. + */ int DEFAULT_CLIENT_REPORT_RETRY_COUNT = 5; + /** + * The constant DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE. + */ boolean DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE = false; + /** + * The constant DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE. + */ boolean DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE = true; + /** + * The constant DEFAULT_TABLE_META_CHECKER_INTERVAL. + */ long DEFAULT_TABLE_META_CHECKER_INTERVAL = 60000L; + /** + * The constant DEFAULT_TM_DEGRADE_CHECK. + */ boolean DEFAULT_TM_DEGRADE_CHECK = false; + /** + * The constant DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE. + */ boolean DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE = false; /** * The default session store dir */ String DEFAULT_SESSION_STORE_FILE_DIR = "sessionStore"; + /** + * The constant DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE. + */ boolean DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE = false; + /** + * The constant DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE. + */ boolean DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE = false; + /** + * The constant DEFAULT_RAFT_SERIALIZATION. + */ String DEFAULT_RAFT_SERIALIZATION = "jackson"; + /** + * The constant DEFAULT_RAFT_COMPRESSOR. + */ String DEFAULT_RAFT_COMPRESSOR = "none"; /** * Shutdown timeout default 3s */ int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 13; + /** + * The constant DEFAULT_SELECTOR_THREAD_SIZE. + */ int DEFAULT_SELECTOR_THREAD_SIZE = 1; + /** + * The constant DEFAULT_BOSS_THREAD_SIZE. + */ int DEFAULT_BOSS_THREAD_SIZE = 1; - + /** + * The constant DEFAULT_SELECTOR_THREAD_PREFIX. + */ String DEFAULT_SELECTOR_THREAD_PREFIX = "NettyClientSelector"; + /** + * The constant DEFAULT_WORKER_THREAD_PREFIX. + */ String DEFAULT_WORKER_THREAD_PREFIX = "NettyClientWorkerThread"; + /** + * The constant DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST. + */ @Deprecated boolean DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST = true; + /** + * The constant DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST. + */ boolean DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST = false; + /** + * The constant DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST. + */ boolean DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = true; + /** + * The constant DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE. + */ boolean DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE = false; + /** + * The constant DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST. + */ boolean DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST = true; + /** + * The constant DEFAULT_BOSS_THREAD_PREFIX. + */ String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss"; + /** + * The constant DEFAULT_NIO_WORKER_THREAD_PREFIX. + */ String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker"; + /** + * The constant DEFAULT_EXECUTOR_THREAD_PREFIX. + */ String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler"; + /** + * The constant DEFAULT_PROTOCOL. + */ String DEFAULT_PROTOCOL = "seata"; + /** + * The constant DEFAULT_TRANSPORT_HEARTBEAT. + */ boolean DEFAULT_TRANSPORT_HEARTBEAT = true; + /** + * The constant DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION. + */ boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true; + /** + * The constant DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION. + */ String DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION = "jackson"; + /** + * The constant DEFAULT_ONLY_CARE_UPDATE_COLUMNS. + */ boolean DEFAULT_ONLY_CARE_UPDATE_COLUMNS = true; /** * The constant DEFAULT_TRANSACTION_UNDO_LOG_TABLE. @@ -93,40 +197,97 @@ public interface DefaultValues { */ String DEFAULT_DISTRIBUTED_LOCK_DB_TABLE = "distributed_lock"; + /** + * The constant DEFAULT_TM_COMMIT_RETRY_COUNT. + */ int DEFAULT_TM_COMMIT_RETRY_COUNT = 5; + /** + * The constant DEFAULT_TM_ROLLBACK_RETRY_COUNT. + */ int DEFAULT_TM_ROLLBACK_RETRY_COUNT = 5; + /** + * The constant DEFAULT_GLOBAL_TRANSACTION_TIMEOUT. + */ int DEFAULT_GLOBAL_TRANSACTION_TIMEOUT = 60000; + /** + * The constant DEFAULT_TX_GROUP. + */ String DEFAULT_TX_GROUP = "default_tx_group"; + /** + * The constant DEFAULT_TX_GROUP_OLD. + */ @Deprecated String DEFAULT_TX_GROUP_OLD = "my_test_tx_group"; + /** + * The constant DEFAULT_TC_CLUSTER. + */ String DEFAULT_TC_CLUSTER = "default"; + /** + * The constant DEFAULT_GROUPLIST. + */ String DEFAULT_GROUPLIST = "127.0.0.1:8091"; + /** + * The constant DEFAULT_DATA_SOURCE_PROXY_MODE. + */ String DEFAULT_DATA_SOURCE_PROXY_MODE = "AT"; + /** + * The constant DEFAULT_DISABLE_GLOBAL_TRANSACTION. + */ boolean DEFAULT_DISABLE_GLOBAL_TRANSACTION = false; + /** + * The constant SERVICE_DEFAULT_PORT. + */ //currently not use and will be delete in the next version @Deprecated int SERVICE_DEFAULT_PORT = 8091; + /** + * The constant SERVICE_OFFSET_SPRING_BOOT. + */ int SERVICE_OFFSET_SPRING_BOOT = 1000; + /** + * The constant SERVER_PORT. + */ String SERVER_PORT = "seata.server.port"; + /** + * The constant SERVER_DEFAULT_STORE_MODE. + */ String SERVER_DEFAULT_STORE_MODE = "file"; + /** + * The constant DEFAULT_SAGA_JSON_PARSER. + */ String DEFAULT_SAGA_JSON_PARSER = "fastjson"; + /** + * The constant DEFAULT_TCC_BUSINESS_ACTION_CONTEXT_JSON_PARSER. + */ // default tcc business action context json parser String DEFAULT_TCC_BUSINESS_ACTION_CONTEXT_JSON_PARSER = "fastjson"; + /** + * The constant DEFAULT_SERVER_ENABLE_CHECK_AUTH. + */ boolean DEFAULT_SERVER_ENABLE_CHECK_AUTH = true; + /** + * The constant DEFAULT_LOAD_BALANCE. + */ String DEFAULT_LOAD_BALANCE = "XID"; + /** + * The constant VIRTUAL_NODES_DEFAULT. + */ int VIRTUAL_NODES_DEFAULT = 10; + /** + * The constant DEFAULT_SEATA_GROUP. + */ String DEFAULT_SEATA_GROUP = "default"; /** @@ -144,7 +305,6 @@ public interface DefaultValues { */ String DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD = "64k"; - /** * the constant DEFAULT_RETRY_DEAD_THRESHOLD */ @@ -283,9 +443,9 @@ public interface DefaultValues { long DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT = -1L; /** - * the const DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE + * The constant DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE. */ - boolean DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = false; + boolean DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE = false; /** * DEFAULT_DISTRIBUTED_LOCK_EXPIRE_TIME @@ -297,16 +457,34 @@ public interface DefaultValues { */ boolean DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE = false; + /** + * The constant DEFAULT_DB_MAX_CONN. + */ int DEFAULT_DB_MAX_CONN = 100; + /** + * The constant DEFAULT_DB_MIN_CONN. + */ int DEFAULT_DB_MIN_CONN = 10; + /** + * The constant DEFAULT_REDIS_MAX_IDLE. + */ int DEFAULT_REDIS_MAX_IDLE = 100; + /** + * The constant DEFAULT_REDIS_MAX_TOTAL. + */ int DEFAULT_REDIS_MAX_TOTAL = 100; + /** + * The constant DEFAULT_REDIS_MIN_IDLE. + */ int DEFAULT_REDIS_MIN_IDLE = 10; + /** + * The constant DEFAULT_QUERY_LIMIT. + */ int DEFAULT_QUERY_LIMIT = 1000; /** @@ -314,5 +492,8 @@ public interface DefaultValues { */ String DRUID_LOCATION = "lib/sqlparser/druid.jar"; + /** + * The constant DEFAULT_ROCKET_MQ_MSG_TIMEOUT. + */ int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java index 7be0de2e729..1ebd36fe453 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java @@ -247,6 +247,8 @@ void doReconnect(List availList, String transactionServiceGroup) { FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value); }); } + } + if (availList.size() == failedMap.size()) { String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", "); throw new FrameworkException("can not connect to [" + invalidAddress + "]"); } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/dm/DmUndoLogManager.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/dm/DmUndoLogManager.java index e267e832131..0ff038fa5a7 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/dm/DmUndoLogManager.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/dm/DmUndoLogManager.java @@ -18,9 +18,11 @@ import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.core.compressor.CompressorType; import org.apache.seata.core.constants.ClientTableColumnsName; import org.apache.seata.rm.datasource.undo.AbstractUndoLogManager; +import org.apache.seata.rm.datasource.undo.UndoLogConstants; import org.apache.seata.rm.datasource.undo.UndoLogParser; import org.apache.seata.sqlparser.util.JdbcConstants; import org.slf4j.Logger; @@ -30,6 +32,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Date; +import java.util.Set; @LoadLevel(name = JdbcConstants.DM) @@ -37,6 +40,8 @@ public class DmUndoLogManager extends AbstractUndoLogManager { private static final Logger LOGGER = LoggerFactory.getLogger(DmUndoLogManager.class); + protected static final String DELETE_SUB_UNDO_LOG_SQL = "DELETE FROM " + UNDO_LOG_TABLE_NAME + " WHERE \"" + + ClientTableColumnsName.UNDO_LOG_CONTEXT.toUpperCase() + "\" = ? AND " + ClientTableColumnsName.UNDO_LOG_XID + " = ?"; private static final String INSERT_UNDO_LOG_SQL = "INSERT INTO " + UNDO_LOG_TABLE_NAME + " (" + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + ", " @@ -48,6 +53,88 @@ public class DmUndoLogManager extends AbstractUndoLogManager { private static final String DELETE_UNDO_LOG_BY_CREATE_SQL = "DELETE FROM " + UNDO_LOG_TABLE_NAME + " WHERE " + ClientTableColumnsName.UNDO_LOG_LOG_CREATED + " <= ? and ROWNUM <= ?"; + /** + * Delete undo log. + * + * @param xid the xid + * @param branchId the branch id + * @param conn the conn + * @throws SQLException the sql exception + */ + @Override + public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException { + try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL); + PreparedStatement deleteSubPST = conn.prepareStatement(DELETE_SUB_UNDO_LOG_SQL)) { + deletePST.setLong(1, branchId); + deletePST.setString(2, xid); + deletePST.executeUpdate(); + + deleteSubPST.setString(1, UndoLogConstants.BRANCH_ID_KEY + CollectionUtils.KV_SPLIT + branchId); + deleteSubPST.setString(2, xid); + deleteSubPST.executeUpdate(); + } catch (Exception e) { + if (!(e instanceof SQLException)) { + e = new SQLException(e); + } + throw (SQLException) e; + } + } + + /** + * batch Delete undo log. + * + * @param xids xid + * @param branchIds branch Id + * @param conn connection + */ + @Override + public void batchDeleteUndoLog(Set xids, Set branchIds, Connection conn) throws SQLException { + if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) { + return; + } + int xidSize = xids.size(); + int branchIdSize = branchIds.size(); + String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize); + String batchDeleteSubSql = toBatchDeleteSubUndoLogSql(xidSize, branchIdSize); + try (PreparedStatement deletePST = conn.prepareStatement(batchDeleteSql); + PreparedStatement deleteSubPST = conn.prepareStatement(batchDeleteSubSql)) { + int paramsIndex = 1; + for (Long branchId : branchIds) { + deletePST.setLong(paramsIndex, branchId); + deleteSubPST.setString(paramsIndex, UndoLogConstants.BRANCH_ID_KEY + CollectionUtils.KV_SPLIT + branchId); + paramsIndex++; + } + for (String xid : xids) { + deletePST.setString(paramsIndex, xid); + deleteSubPST.setString(paramsIndex, xid); + paramsIndex++; + } + int deleteRows = deletePST.executeUpdate(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("batch delete undo log size {}", deleteRows); + } + int deleteSubRows = deleteSubPST.executeUpdate(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("batch delete sub undo log size {}", deleteSubRows); + } + } catch (Exception e) { + if (!(e instanceof SQLException)) { + e = new SQLException(e); + } + throw (SQLException) e; + } + } + + protected static String toBatchDeleteSubUndoLogSql(int xidSize, int branchIdSize) { + StringBuilder sqlBuilder = new StringBuilder(64); + sqlBuilder.append("DELETE FROM ").append(UNDO_LOG_TABLE_NAME).append(" WHERE \"").append( + ClientTableColumnsName.UNDO_LOG_CONTEXT.toUpperCase()).append("\" IN "); + appendInParam(branchIdSize, sqlBuilder); + sqlBuilder.append(" AND ").append(ClientTableColumnsName.UNDO_LOG_XID).append(" IN "); + appendInParam(xidSize, sqlBuilder); + return sqlBuilder.toString(); + } + @Override public int deleteUndoLogByLogCreated(Date logCreated, int limitRows, Connection conn) throws SQLException { try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_BY_CREATE_SQL)) { diff --git a/script/config-center/config.txt b/script/config-center/config.txt index 8cf986f3f94..af831bc5d23 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -140,7 +140,7 @@ server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 -server.rollbackRetryTimeoutUnlockEnable=false +server.rollbackFailedUnlockEnable=false server.distributedLockExpireTime=10000 server.session.branchAsyncQueueSize=5000 server.session.enableBranchAsyncRemove=false diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java index 0bf2f122215..9d3086ad7ae 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java @@ -35,7 +35,16 @@ public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup()); BranchTransactionDTO branchTransactionDTO = sessionSyncMsg.getBranchSession(); - GlobalSession globalSession = raftSessionManager.findGlobalSession(branchTransactionDTO.getXid()); + String xid = branchTransactionDTO.getXid(); + GlobalSession globalSession = raftSessionManager.findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } BranchSession branchSession = SessionConverter.convertBranchSession(branchTransactionDTO); branchSession.lock(); globalSession.add(branchSession); diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java index c98f3820476..e096ea36cdc 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java @@ -33,8 +33,26 @@ public class UpdateBranchSessionExecute extends AbstractRaftMsgExecute { public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup()); - GlobalSession globalSession = raftSessionManager.findGlobalSession(sessionSyncMsg.getBranchSession().getXid()); - BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId()); + String xid = sessionSyncMsg.getBranchSession().getXid(); + GlobalSession globalSession = raftSessionManager.findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } + long branchId = sessionSyncMsg.getBranchSession().getBranchId(); + BranchSession branchSession = globalSession.getBranch(branchId); + if (branchSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The branch session corresponding to the branchId: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + sessionSyncMsg.getBranchSession().getBranchId(), syncMsg.getMsgType()); + } + return false; + } BranchStatus status = BranchStatus.get(sessionSyncMsg.getBranchSession().getStatus()); branchSession.setStatus(status); if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java index 805ef1b7f7b..206fe77e3cd 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java @@ -30,8 +30,16 @@ public class BranchReleaseLockExecute extends AbstractRaftMsgExecute { @Override public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; - GlobalSession globalSession = - SessionHolder.getRootSessionManager().findGlobalSession(sessionSyncMsg.getBranchSession().getXid()); + String xid = sessionSyncMsg.getBranchSession().getXid(); + GlobalSession globalSession = SessionHolder.getRootSessionManager().findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId()); if (branchSession != null) { if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java b/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java index 348a7502830..1d55855f5d8 100644 --- a/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java +++ b/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java @@ -18,7 +18,6 @@ import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.result.Result; -import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.store.MappingDO; @@ -29,13 +28,6 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import javax.annotation.PostConstruct; - -import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR; -import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY; -import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE; -import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; - @RestController @RequestMapping("/vgroup/v1") public class VGroupMappingController { @@ -44,15 +36,6 @@ public class VGroupMappingController { protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); - @PostConstruct - private void init() { - String type = - ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE); - if (StringUtils.equals(type, NAMING_SERVER)) { - vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - } - } - /** * add vGroup in cluster * @@ -67,7 +50,7 @@ public Result addVGroup(@RequestParam String vGroup, @RequestParam String uni mappingDO.setCluster(Instance.getInstance().getClusterName()); mappingDO.setUnit(unit); mappingDO.setVGroup(vGroup); - boolean rst = vGroupMappingStoreManager.addVGroup(mappingDO); + boolean rst = SessionHolder.getRootVGroupMappingManager().addVGroup(mappingDO); Instance.getInstance().setTerm(System.currentTimeMillis()); if (!rst) { result.setCode("500"); @@ -85,7 +68,7 @@ public Result addVGroup(@RequestParam String vGroup, @RequestParam String uni @GetMapping("/removeVGroup") public Result removeVGroup(@RequestParam String vGroup) { Result result = new Result<>(); - boolean rst = vGroupMappingStoreManager.removeVGroup(vGroup); + boolean rst = SessionHolder.getRootVGroupMappingManager().removeVGroup(vGroup); Instance.getInstance().setTerm(System.currentTimeMillis()); if (!rst) { result.setCode("500"); diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java index 9003fe268aa..ddfd5f35d66 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; +import org.apache.commons.lang.time.DateFormatUtils; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; @@ -71,7 +72,6 @@ import org.apache.seata.server.session.SessionHelper; import org.apache.seata.server.session.SessionHolder; import org.apache.seata.server.store.StoreConfig; -import org.apache.commons.lang.time.DateFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -90,7 +90,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_MAX_COMMIT_RETRY_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACKING_RETRY_PERIOD; -import static org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE; +import static org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE; import static org.apache.seata.common.DefaultValues.DEFAULT_TIMEOUT_RETRY_PERIOD; import static org.apache.seata.common.DefaultValues.DEFAULT_UNDO_LOG_DELETE_PERIOD; @@ -159,7 +159,10 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT); private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean( - ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE); + ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE); + + private static final boolean ROLLBACK_FAILED_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean( + ConfigurationKeys.ROLLBACK_FAILED_UNLOCK_ENABLE, DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE); private static final int RETRY_DEAD_THRESHOLD = ConfigurationFactory.getInstance() .getInt(org.apache.seata.common.ConfigurationKeys.RETRY_DEAD_THRESHOLD, DefaultValues.DEFAULT_RETRY_DEAD_THRESHOLD); @@ -341,15 +344,15 @@ protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryRespon protected void timeoutCheck() { SessionCondition sessionCondition = new SessionCondition(GlobalStatus.Begin); sessionCondition.setLazyLoadBranch(true); - Collection beginGlobalsessions = + Collection beginGlobalSessions = SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); - if (CollectionUtils.isEmpty(beginGlobalsessions)) { + if (CollectionUtils.isEmpty(beginGlobalSessions)) { return; } - if (!beginGlobalsessions.isEmpty() && LOGGER.isDebugEnabled()) { - LOGGER.debug("Global transaction timeout check begin, size: {}", beginGlobalsessions.size()); + if (!beginGlobalSessions.isEmpty() && LOGGER.isDebugEnabled()) { + LOGGER.debug("Global transaction timeout check begin, size: {}", beginGlobalSessions.size()); } - SessionHelper.forEach(beginGlobalsessions, globalSession -> { + SessionHelper.forEach(beginGlobalSessions, globalSession -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug( globalSession.getXid() + " " + globalSession.getStatus() + " " + globalSession.getBeginTime() + " " @@ -372,7 +375,7 @@ protected void timeoutCheck() { return true; }); }); - if (!beginGlobalsessions.isEmpty() && LOGGER.isDebugEnabled()) { + if (!beginGlobalSessions.isEmpty() && LOGGER.isDebugEnabled()) { LOGGER.debug("Global transaction timeout check end. "); } @@ -394,7 +397,7 @@ protected void handleRetryRollbacking() { SessionHelper.forEach(rollbackingSessions, rollbackingSession -> { try { if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { - if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { + if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE || ROLLBACK_FAILED_UNLOCK_ENABLE) { rollbackingSession.clean(); } @@ -520,7 +523,7 @@ protected void handleRollbackingByScheduled() { SessionHelper.forEach(needDoRollbackingSessions, rollbackingSession -> { try { if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { - if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { + if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE || ROLLBACK_FAILED_UNLOCK_ENABLE) { rollbackingSession.clean(); } diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCore.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCore.java index d6c93532fa7..5acbc8988de 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCore.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCore.java @@ -57,7 +57,7 @@ public class DefaultCore implements Core { private static final int RETRY_XAER_NOTA_TIMEOUT = ConfigurationFactory.getInstance().getInt(XAER_NOTA_RETRY_TIMEOUT, DefaultValues.DEFAULT_XAER_NOTA_RETRY_TIMEOUT); - private static Map coreMap = new ConcurrentHashMap<>(); + private static final Map CORE_MAP = new ConcurrentHashMap<>(); private static final boolean PARALLEL_HANDLE_BRANCH = ConfigurationFactory.getInstance().getBoolean(ENABLE_PARALLEL_HANDLE_BRANCH_KEY, false); @@ -72,7 +72,7 @@ public DefaultCore(RemotingServer remotingServer) { new Class[] {RemotingServer.class}, new Object[] {remotingServer}); if (CollectionUtils.isNotEmpty(allCore)) { for (AbstractCore core : allCore) { - coreMap.put(core.getHandleBranchType(), core); + CORE_MAP.put(core.getHandleBranchType(), core); } } } @@ -84,7 +84,7 @@ public DefaultCore(RemotingServer remotingServer) { * @return the core */ public AbstractCore getCore(BranchType branchType) { - AbstractCore core = coreMap.get(branchType); + AbstractCore core = CORE_MAP.get(branchType); if (core == null) { throw new NotSupportYetException("unsupported type:" + branchType.name()); } @@ -98,7 +98,7 @@ public AbstractCore getCore(BranchType branchType) { * @param core the core */ public void mockCore(BranchType branchType, AbstractCore core) { - coreMap.put(branchType, core); + CORE_MAP.put(branchType, core); } @Override diff --git a/server/src/main/java/org/apache/seata/server/session/AbstractSessionManager.java b/server/src/main/java/org/apache/seata/server/session/AbstractSessionManager.java index 611993b1594..8b2e7095936 100644 --- a/server/src/main/java/org/apache/seata/server/session/AbstractSessionManager.java +++ b/server/src/main/java/org/apache/seata/server/session/AbstractSessionManager.java @@ -16,6 +16,8 @@ */ package org.apache.seata.server.session; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.exception.BranchTransactionException; import org.apache.seata.core.exception.GlobalTransactionException; import org.apache.seata.core.exception.TransactionException; @@ -29,10 +31,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE; + /** * The type Abstract session manager. */ public abstract class AbstractSessionManager implements SessionManager { + boolean rollbackFailedUnlockEnable = ConfigurationFactory.getInstance().getBoolean( + ConfigurationKeys.ROLLBACK_FAILED_UNLOCK_ENABLE, DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE); /** * The constant LOGGER. @@ -157,6 +163,11 @@ public void onSuccessEnd(GlobalSession globalSession) throws TransactionExceptio @Override public void onFailEnd(GlobalSession globalSession) throws TransactionException { + if (rollbackFailedUnlockEnable) { + globalSession.clean(); + LOGGER.info("xid:{} fail end and remove lock, transaction:{}", globalSession.getXid(), globalSession); + return; + } LOGGER.info("xid:{} fail end, transaction:{}", globalSession.getXid(), globalSession); } diff --git a/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java b/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java index 654af466245..f6fa7f074aa 100644 --- a/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java +++ b/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java @@ -54,6 +54,11 @@ public static boolean isRollbackGlobalStatus(GlobalStatus status) { || status == GlobalStatus.RollbackRetryTimeout; } + public static boolean isEndGlobalStatus(GlobalStatus status) { + return status == GlobalStatus.Rollbacked || status == GlobalStatus.TimeoutRollbacked + || status == GlobalStatus.Committed || status == GlobalStatus.Finished; + } + /** * is commit global status * diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index 5d92d069284..059312ae856 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -139,7 +139,7 @@ seata: service-port: 8091 #If not configured, the default is '${server.port} + 1000' max-commit-retry-timeout: -1 max-rollback-retry-timeout: -1 - rollback-retry-timeout-unlock-enable: false + rollback-failed-unlock-enable: false enable-check-auth: true enable-parallel-request-handle: true enable-parallel-handle-branch: false diff --git a/server/src/main/resources/application.raft.example.yml b/server/src/main/resources/application.raft.example.yml index 6c96b9e9fac..61c644749a5 100644 --- a/server/src/main/resources/application.raft.example.yml +++ b/server/src/main/resources/application.raft.example.yml @@ -116,7 +116,7 @@ seata: enable-branch-async-remove: false #enable to asynchronous remove branchSession store: # support: file - mode: file + mode: raft file: dir: sessionStore max-branch-session-size: 16384 diff --git a/tm/src/test/java/org/apache/seata/tm/DefaultTransactionManagerTest.java b/tm/src/test/java/org/apache/seata/tm/DefaultTransactionManagerTest.java new file mode 100644 index 00000000000..1841e56b44e --- /dev/null +++ b/tm/src/test/java/org/apache/seata/tm/DefaultTransactionManagerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.tm; + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequest; +import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.protocol.transaction.GlobalBeginResponse; +import org.apache.seata.core.protocol.transaction.GlobalCommitRequest; +import org.apache.seata.core.protocol.transaction.GlobalCommitResponse; +import org.apache.seata.core.protocol.transaction.GlobalReportRequest; +import org.apache.seata.core.protocol.transaction.GlobalReportResponse; +import org.apache.seata.core.protocol.transaction.GlobalRollbackRequest; +import org.apache.seata.core.protocol.transaction.GlobalRollbackResponse; +import org.apache.seata.core.protocol.transaction.GlobalStatusRequest; +import org.apache.seata.core.protocol.transaction.GlobalStatusResponse; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.concurrent.TimeoutException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/** + * the type DefaultTransactionManager + */ +public class DefaultTransactionManagerTest { + + private final static String DEFAULT_XID = "1234567890"; + + private DefaultTransactionManager defaultTransactionManager; + + private MockedStatic tmNettyRemotingClientMockedStatic; + + @Mock + private TmNettyRemotingClient tmNettyRemotingClient; + + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + tmNettyRemotingClientMockedStatic = Mockito.mockStatic(TmNettyRemotingClient.class); + tmNettyRemotingClientMockedStatic.when(TmNettyRemotingClient::getInstance).thenReturn(tmNettyRemotingClient); + defaultTransactionManager = new DefaultTransactionManager(); + } + + @AfterEach + void destory(){ + tmNettyRemotingClientMockedStatic.close(); + } + + @Test + void testBeginSuccess() throws Exception { + GlobalBeginResponse mockResponse = new GlobalBeginResponse(); + mockResponse.setResultCode(ResultCode.Success); + mockResponse.setXid(DEFAULT_XID); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalBeginRequest.class))).thenReturn(mockResponse); + + String xid = defaultTransactionManager.begin("appId", "txGroup", "testName", 1000); + + Assertions.assertEquals("1234567890", xid); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalBeginRequest.class)); + } + + @Test + void testBeginFailure() throws Exception { + GlobalBeginResponse mockResponse = new GlobalBeginResponse(); + mockResponse.setResultCode(ResultCode.Failed); + mockResponse.setMsg("Failed to begin transaction"); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalBeginRequest.class))).thenReturn(mockResponse); + + TransactionException exception = Assertions.assertThrows(TransactionException.class, + () -> defaultTransactionManager.begin("appId", "txGroup", "testName", 1000)); + + Assertions.assertTrue(exception.getMessage().contains("Failed to begin transaction")); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalBeginRequest.class)); + } + + @Test + void testCommitSuccess() throws Exception { + GlobalCommitResponse mockResponse = new GlobalCommitResponse(); + mockResponse.setGlobalStatus(GlobalStatus.Committed); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalCommitRequest.class))).thenReturn(mockResponse); + + GlobalStatus status = defaultTransactionManager.commit(DEFAULT_XID); + + Assertions.assertEquals(GlobalStatus.Committed, status); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalCommitRequest.class)); + } + + @Test + void testRollbackSuccess() throws Exception { + GlobalRollbackResponse mockResponse = new GlobalRollbackResponse(); + mockResponse.setGlobalStatus(GlobalStatus.Rollbacked); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalRollbackRequest.class))).thenReturn(mockResponse); + + GlobalStatus status = defaultTransactionManager.rollback(DEFAULT_XID); + + Assertions.assertEquals(GlobalStatus.Rollbacked, status); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalRollbackRequest.class)); + } + + @Test + void testGetStatusSuccess() throws Exception { + GlobalStatusResponse mockResponse = new GlobalStatusResponse(); + mockResponse.setGlobalStatus(GlobalStatus.Committing); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalStatusRequest.class))).thenReturn(mockResponse); + + GlobalStatus status = defaultTransactionManager.getStatus(DEFAULT_XID); + + Assertions.assertEquals(GlobalStatus.Committing, status); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalStatusRequest.class)); + } + + @Test + void testGlobalReportSuccess() throws Exception { + GlobalReportResponse mockResponse = new GlobalReportResponse(); + mockResponse.setGlobalStatus(GlobalStatus.Committed); + + when(tmNettyRemotingClient.sendSyncRequest(any(GlobalReportRequest.class))).thenReturn(mockResponse); + + GlobalStatus status = defaultTransactionManager.globalReport(DEFAULT_XID, GlobalStatus.Committed); + + Assertions.assertEquals(GlobalStatus.Committed, status); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(GlobalReportRequest.class)); + } + + @Test + void testSyncCallTimeout() throws Exception { + when(tmNettyRemotingClient.sendSyncRequest(any(AbstractTransactionRequest.class))) + .thenThrow(new TimeoutException("Timeout occurred")); + + TransactionException exception = Assertions.assertThrows(TransactionException.class, + () -> defaultTransactionManager.getStatus(DEFAULT_XID)); + + Assertions.assertTrue(exception.getMessage().contains("RPC timeout")); + Mockito.verify(tmNettyRemotingClient).sendSyncRequest(any(AbstractTransactionRequest.class)); + } +} diff --git a/tm/src/test/java/org/apache/seata/tm/TransactionManagerHolderTest.java b/tm/src/test/java/org/apache/seata/tm/TransactionManagerHolderTest.java index 2b673e1dac8..9ee20b30738 100644 --- a/tm/src/test/java/org/apache/seata/tm/TransactionManagerHolderTest.java +++ b/tm/src/test/java/org/apache/seata/tm/TransactionManagerHolderTest.java @@ -18,10 +18,11 @@ import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.tm.api.transaction.MockTransactionManager; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; - class TransactionManagerHolderTest { @@ -31,4 +32,13 @@ void getTest() { TransactionManagerHolder.get();}); } + + @Test + void getInstanceTest() { + MockTransactionManager mockTransactionManager = new MockTransactionManager(); + TransactionManagerHolder.set(mockTransactionManager); + TransactionManager transactionManager = TransactionManagerHolder.get(); + Assertions.assertTrue(transactionManager instanceof MockTransactionManager); + } + } diff --git a/tm/src/test/java/org/apache/seata/tm/api/FailureHandlerHolderTest.java b/tm/src/test/java/org/apache/seata/tm/api/FailureHandlerHolderTest.java new file mode 100644 index 00000000000..0d28c5277fe --- /dev/null +++ b/tm/src/test/java/org/apache/seata/tm/api/FailureHandlerHolderTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.tm.api; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * the type FailureHandlerHolder + */ +public class FailureHandlerHolderTest { + + @Test + void testSetFailureHandlerWithCustomHandler() { + MockFailureHandlerImpl mockFailureHandlerImpl = new MockFailureHandlerImpl(); + + FailureHandlerHolder.setFailureHandler(mockFailureHandlerImpl); + + Assertions.assertEquals(mockFailureHandlerImpl, FailureHandlerHolder.getFailureHandler()); + } +} diff --git a/tm/src/test/java/org/apache/seata/tm/api/MockFailureHandlerImpl.java b/tm/src/test/java/org/apache/seata/tm/api/MockFailureHandlerImpl.java new file mode 100644 index 00000000000..a99173ce2c8 --- /dev/null +++ b/tm/src/test/java/org/apache/seata/tm/api/MockFailureHandlerImpl.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.tm.api; + +public class MockFailureHandlerImpl implements FailureHandler{ + @Override + public void onBeginFailure(BaseTransaction tx, Throwable cause) { + + } + + @Override + public void onCommitFailure(BaseTransaction tx, Throwable cause) { + + } + + @Override + public void onRollbackFailure(BaseTransaction tx, Throwable originalException) { + + } + + @Override + public void onRollbacking(BaseTransaction tx, Throwable originalException) { + + } +} diff --git a/tm/src/test/java/org/apache/seata/tm/api/transaction/MockTransactionManager.java b/tm/src/test/java/org/apache/seata/tm/api/transaction/MockTransactionManager.java new file mode 100644 index 00000000000..2d2d41b820f --- /dev/null +++ b/tm/src/test/java/org/apache/seata/tm/api/transaction/MockTransactionManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.tm.api.transaction; + + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.model.TransactionManager; + +public class MockTransactionManager implements TransactionManager { + @Override + public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { + return null; + } + + @Override + public GlobalStatus commit(String xid) throws TransactionException { + return null; + } + + @Override + public GlobalStatus rollback(String xid) throws TransactionException { + return null; + } + + @Override + public GlobalStatus getStatus(String xid) throws TransactionException { + return null; + } + + @Override + public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException { + return null; + } +} diff --git a/tm/src/test/java/org/apache/seata/tm/api/transaction/SuspendedResourcesHolderTest.java b/tm/src/test/java/org/apache/seata/tm/api/transaction/SuspendedResourcesHolderTest.java new file mode 100644 index 00000000000..00ad2c9b999 --- /dev/null +++ b/tm/src/test/java/org/apache/seata/tm/api/transaction/SuspendedResourcesHolderTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.tm.api.transaction; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * the type SuspendedResourcesHolder + */ +public class SuspendedResourcesHolderTest { + + private final static String DEFAULT_XID = "1234567890"; + + @Test + void testIllegalArgumentException() { + Assertions.assertThrows(IllegalArgumentException.class, () -> { + new SuspendedResourcesHolder(null); + }); + } + + @Test + void getXidTest() { + SuspendedResourcesHolder suspendedResourcesHolder = new SuspendedResourcesHolder(DEFAULT_XID); + Assertions.assertEquals(DEFAULT_XID, suspendedResourcesHolder.getXid()); + } +}