Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide saga functionality (based on interceptors) #1681

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-tm</artifactId>
Expand Down Expand Up @@ -524,6 +529,7 @@
<include>io.seata:seata-sofa-rpc</include>
<include>io.seata:seata-spring</include>
<include>io.seata:seata-tcc</include>
<include>io.seata:seata-saga</include>
<include>io.seata:seata-tm</include>
<include>io.seata:seata-codec-seata</include>
<include>io.seata:seata-codec-protobuf</include>
Expand Down
21 changes: 20 additions & 1 deletion common/src/main/java/io/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,40 @@ public class Constants {
*/
public final static String TCC_METHOD_RESULT = "result";

/**
* The constant SAGA_METHOD_RESULT.
*/
public final static String SAGA_METHOD_RESULT = "result";

/**
* The constant TCC_METHOD_ARGUMENTS.
*/
public final static String TCC_METHOD_ARGUMENTS = "arguments";

/**
*/
public final static String SAGA_METHOD_ARGUMENTS = "arguments";

/**
* transaction context
*/
public final static String TCC_ACTIVITY_CONTEXT = "activityContext";
public final static String TCC_ACTIVITY_CONTEXT = "activityContext";

/**
* transaction context
*/
public final static String SAGA_ACTIVITY_CONTEXT = "activityContext";

/**
* branch context
*/
public final static String TCC_ACTION_CONTEXT = "actionContext";

/**
* branch context
*/
public final static String SAGA_ACTION_CONTEXT = "actionContext";

/**
* default charset name
*/
Expand Down
55 changes: 46 additions & 9 deletions core/src/main/java/io/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.seata.common.exception.ShouldNeverHappenException;

import io.seata.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,21 +33,32 @@ public class RootContext {
/**
* The constant KEY_XID.
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";

private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();

public static final String KEY_XID = "TX_XID";

public static final String KEY_XID_TYPE = "TX_XID_TYPE";

public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";

private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();

/**
* Gets xid.
*
* @return the xid
*/
public static String getXID() {
return CONTEXT_HOLDER.get(KEY_XID);
}
String xid = CONTEXT_HOLDER.get(KEY_XID);
if(StringUtils.isNotBlank(xid)){
return xid;
}

String xidType = CONTEXT_HOLDER.get(KEY_XID_TYPE);
if(StringUtils.isNotBlank(xidType)){
return xidType.split("_")[0];
}
return null;
}

/**
* Bind.
*
Expand All @@ -59,6 +71,18 @@ public static void bind(String xid) {
CONTEXT_HOLDER.put(KEY_XID, xid);
}

/**
* Bind type
*
* @param xidType
*/
public static void bindType(String xidType) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind type " + xidType);
}
CONTEXT_HOLDER.put(KEY_XID_TYPE, xidType);
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand All @@ -84,7 +108,20 @@ public static String unbind() {
}
return xid;
}


/**
* Unbind temporary string
*
* @return the string
*/
public static String unbindType() {
String xidType = CONTEXT_HOLDER.remove(KEY_XID_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind type " + xidType);
}
return xidType;
}

public static void unbindGlobalLockFlag() {
String lockFlag = CONTEXT_HOLDER.remove(KEY_GLOBAL_LOCK_FLAG);
if (LOGGER.isDebugEnabled() && lockFlag != null) {
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/io/seata/core/model/BranchType.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ public enum BranchType {
*/
// AT Branch
AT,


/**
* The At.
*/
// AT Branch
SAGA,

/**
* The TCC.
*/
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<module>server</module>
<module>spring</module>
<module>tcc</module>
<module>saga</module>
<module>test</module>
<module>tm</module>
<module>metrics</module>
Expand Down
69 changes: 69 additions & 0 deletions saga/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
### TCC


#### TCC 简介

在2PC(两阶段提交)协议中,事务管理器分两阶段协调资源管理,资源管理器对外提供了3个操作,分别是一阶段的准备操作,二阶段的提交操作和回滚操作;

TCC服务作为一种事务资源,遵循两阶段提交协议,由业务层面自定义,需要用户根据业务逻辑编码实现;其包含Try、Confirm 和 Cancel 3个操作,其中Try操作对应分布式事务一阶段的准备,Confirm操作对应分布式事务二阶段提交,Cancel对应分布式事务二阶段回滚:

- Try:

资源的检查和预留;

- Comfirm:

使用预留的资源,完成真正的业务操作;要求Try成功Confirm 一定要能成功;

- Cancel:

释放预留资源;


TCC的3个方法均由用户根据业务场景编码实现,并对外发布成微服务,供事务管理器调用;事务管理器在一阶段调用TCC的Try方法,在二阶段提交时调用Confirm方法,在二阶段回滚时调用Cancel方法。


#### TCC实现

##### 1、TCC微服务

TCC服务由用户编码实现并对外发布成微服务,目前支持3种形式的TCC微服务,分别是:

- SofaRpc 服务

用户将实现的TCC操作对外发布成 SofaRpc 服务,事务管理器通过订阅SofaRpc服务,来协调TCC资源;

- Dubbo 服务

将TCC发布成dubbo服务,事务管理器订阅dubbo服务,来协调TCC资源;

- Local TCC

本地普通的TCC Bean,非远程服务;事务管理器通过本地方法调用,来协调TCC 资源;

##### 2、TCC资源动态代理

对TCC服务进行动态代理,GlobalTransactionScanner中当扫描到TCC 服务的 ‘reference’时,会对其进行动态代理;

TCC 动态代理的主要功能是:生成TCC运行时上下文、透传业务参数、注册分支事务记录;

#### 模块说明

seata-tcc 包含TCC主要代码:

- interceptor:TCC 动态代理;

- remoting:TCC微服务扫描、RPC协议解析、TCC资源注册;

- TccResourceManager、RMHandlerTCC: TCC资源管理器;




#### 其他说明:

本此为了区分AT、TCC 2种资源类型,在客户端和服务端通信的接口中,均添加了 BranchType 参数;




45 changes: 45 additions & 0 deletions saga/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-saga</artifactId>
<packaging>jar</packaging>
<name>seata-saga ${project.version}</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-rm</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


</project>
51 changes: 51 additions & 0 deletions saga/src/main/java/io/seata/rm/saga/RMHandlerSAGA.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.saga;

import io.seata.core.model.BranchType;
import io.seata.core.model.ResourceManager;
import io.seata.core.protocol.transaction.UndoLogDeleteRequest;
import io.seata.rm.AbstractRMHandler;
import io.seata.rm.DefaultResourceManager;

/**
* The type Rm handler saga.
*
* @author zhangsen
*/
public class RMHandlerSAGA extends AbstractRMHandler {

@Override
public void handle(UndoLogDeleteRequest request) {
//DO nothing
}

/**
* get TCC resource manager
*
* @return
*/
@Override
protected ResourceManager getResourceManager() {
return DefaultResourceManager.get().getResourceManager(BranchType.SAGA);
}

@Override
public BranchType getBranchType() {
return BranchType.SAGA;
}

}
Loading