Skip to content

Commit

Permalink
Merge branch '2.x' into 2.x_24_11_11_02
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Nov 15, 2024
2 parents 2aa31b8 + f195eb9 commit bb9972a
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 90 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6950](https://github.com/apache/incubator-seata/pull/6950)] Remove JVM parameter app.id
- [[#6959](https://github.com/apache/incubator-seata/pull/6959)] update the naming and description for the `seata-http-jakarta` module
- [[#6991](https://github.com/apache/incubator-seata/pull/6991)] gRPC serialization default to Protobuf
- [[#6993](https://github.com/apache/incubator-seata/pull/6993)] optimize transaction metrics
- [[#6995](https://github.com/apache/incubator-seata/pull/6995)] upgrade outdate npmjs dependencies
- [[#6996](https://github.com/apache/incubator-seata/pull/6996)] optimize lock release logic in AT transaction mode

Expand Down
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
- [[#6959](https://github.com/apache/incubator-seata/pull/6959)] 修正 `seata-http-jakarta`的模块命名和描述
- [[#6991](https://github.com/apache/incubator-seata/pull/6991)] gRPC协议序列化默认值为protobuf
- [[#6996](https://github.com/apache/incubator-seata/pull/6996)] 优化 AT 事务模式锁释放逻辑
- [[#6993](https://github.com/apache/incubator-seata/pull/6993)] 优化 metrics 指标


### refactor:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public interface MeterIdConstants {
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_COMMITTED);

Id TIMER_ROLLBACK = new Id(IdConstants.SEATA_TRANSACTION)
Id TIMER_ROLLBACKED = new Id(IdConstants.SEATA_TRANSACTION)
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_ROLLBACKED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seata.core.event.ExceptionEvent;
import org.apache.seata.core.event.GlobalTransactionEvent;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.metrics.Id;
import org.apache.seata.metrics.registry.Registry;
import org.apache.seata.server.event.EventBusManager;
import org.slf4j.Logger;
Expand All @@ -48,21 +49,45 @@ public class MetricsSubscriber {

public MetricsSubscriber(Registry registry) {
this.registry = registry;
consumers = new HashMap<>();
consumers.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin);
consumers.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted);
consumers.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked);
this.consumers = initializeConsumers();
}

private Map<String, Consumer<GlobalTransactionEvent>> initializeConsumers() {
Map<String, Consumer<GlobalTransactionEvent>> consumerMap = new HashMap<>();
consumerMap.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin);
consumerMap.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted);
consumerMap.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked);

consumerMap.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed);
consumerMap.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed);
consumerMap.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked);
consumerMap.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed);

consumerMap.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout);
consumerMap.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout);

consumerMap.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted);
consumerMap.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked);
return consumerMap;
}

consumers.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed);
consumers.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed);
consumers.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked);
consumers.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed);
private void increaseCounter(Id counterId, GlobalTransactionEvent event) {
registry.getCounter(counterId.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
}
private void decreaseCounter(Id counterId, GlobalTransactionEvent event) {
registry.getCounter(counterId.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).decrease(1);
}

consumers.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout);
consumers.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout);
private void increaseSummary(Id summaryId, GlobalTransactionEvent event, long value) {
registry.getSummary(
summaryId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(value);
}

consumers.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted);
consumers.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked);
private void increaseTimer(Id timerId, GlobalTransactionEvent event) {
registry.getTimer(
timerId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
}

private void processGlobalStatusBegin(GlobalTransactionEvent event) {
Expand All @@ -72,124 +97,84 @@ private void processGlobalStatusBegin(GlobalTransactionEvent event) {
LOGGER.debug("subscribe:{},threadName:{}", object.toString(), Thread.currentThread().getName());
}
}
registry.getCounter(MeterIdConstants.COUNTER_ACTIVE.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
increaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}

private void processGlobalStatusCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
decreaseActive(event);
registry.getCounter(MeterIdConstants.COUNTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseCounter(MeterIdConstants.COUNTER_COMMITTED, event);
increaseSummary(MeterIdConstants.SUMMARY_COMMITTED, event, 1);
increaseTimer(MeterIdConstants.TIMER_COMMITTED, event);
}

private void processGlobalStatusRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
decreaseActive(event);
registry.getCounter(MeterIdConstants.COUNTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_ROLLBACK
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseCounter(MeterIdConstants.COUNTER_ROLLBACKED, event);
increaseSummary(MeterIdConstants.SUMMARY_ROLLBACKED, event, 1);
increaseTimer(MeterIdConstants.TIMER_ROLLBACKED, event);
}

private void processAfterGlobalRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
registry.getCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
increaseCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED, event);
increaseSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED, event, 1);
increaseTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED, event);
}

private void processAfterGlobalCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
registry.getCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
increaseCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED, event);
increaseSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED, event, 1);
increaseTimer(MeterIdConstants.TIMER_AFTER_COMMITTED, event);
}

private void processGlobalStatusCommitFailed(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}

private void processGlobalStatusRollbackFailed(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}

private void processGlobalStatusTimeoutRollbacked(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}

private void processGlobalStatusTimeoutRollbackFailed(GlobalTransactionEvent event) {
decreaseActive(event);
reportTwoPhaseTimeout(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
reportFailed(event);
}

private void processGlobalStatusCommitRetryTimeout(GlobalTransactionEvent event) {
decreaseActive(event);
reportTwoPhaseTimeout(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
//The phase 2 retry timeout state should be considered a transaction failed
reportFailed(event);
}

private void processGlobalStatusTimeoutRollbackRetryTimeout(GlobalTransactionEvent event) {
decreaseActive(event);
}

private void decreaseActive(GlobalTransactionEvent event) {
registry.getCounter(MeterIdConstants.COUNTER_ACTIVE
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).decrease(1);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
//The phase 2 retry timeout state should be considered a transaction failed
reportFailed(event);
}

private void reportFailed(GlobalTransactionEvent event) {
registry.getSummary(MeterIdConstants.SUMMARY_FAILED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_FAILED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
}

private void reportTwoPhaseTimeout(GlobalTransactionEvent event) {
registry.getSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
increaseSummary(MeterIdConstants.SUMMARY_FAILED, event, 1);
increaseTimer(MeterIdConstants.TIMER_FAILED, event);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seata.common.XID;
import org.apache.seata.common.util.BufferUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.exception.GlobalTransactionException;
import org.apache.seata.core.exception.TransactionException;
Expand All @@ -41,7 +42,6 @@
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.LockStatus;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.server.cluster.raft.RaftServerManager;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.store.SessionStorable;
Expand Down Expand Up @@ -793,7 +793,7 @@ public void queueToRetryCommit() throws TransactionException {
public void queueToRetryRollback() throws TransactionException {
GlobalStatus currentStatus = this.getStatus();
GlobalStatus newStatus;
if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
if (GlobalStatus.TimeoutRollbacking == currentStatus) {
newStatus = GlobalStatus.TimeoutRollbackRetrying;
} else {
newStatus = GlobalStatus.RollbackRetrying;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static void endRollbacked(GlobalSession globalSession, boolean retryGloba
boolean retryBranch =
currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;
if (!currentStatus.equals(GlobalStatus.TimeoutRollbacked)
&& SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
&& SessionStatusValidator.isTimeoutRollbacking(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
} else if (!globalSession.getStatus().equals(GlobalStatus.Rollbacked)) {
globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
Expand Down Expand Up @@ -255,7 +255,7 @@ public static void endRollbackFailed(GlobalSession globalSession, boolean retryG
GlobalStatus currentStatus = globalSession.getStatus();
if (isRetryTimeout) {
globalSession.changeGlobalStatus(GlobalStatus.RollbackRetryTimeout);
} else if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
} else if (SessionStatusValidator.isTimeoutRollbacking(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
|| status == GlobalStatus.TimeoutRollbackRetrying;
}

public static boolean isTimeoutRollbacking(GlobalStatus status) {
return status == GlobalStatus.TimeoutRollbacking
|| status == GlobalStatus.TimeoutRollbackRetrying;
}

/**
* is rollback global status
*
Expand Down

0 comments on commit bb9972a

Please sign in to comment.