diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md index abe5dcc99a2..d720bf2c269 100644 --- a/changes/en-us/develop.md +++ b/changes/en-us/develop.md @@ -23,6 +23,7 @@ Add changes here for all PR submitted to the develop branch. - [[#6044](https://github.com/seata/seata/pull/6044)] optimize derivative product check base on mysql - [[#6361](https://github.com/seata/seata/pull/6361)] optimize 401 issues for some links - [[#6903](https://github.com/apache/incubator-seata/pull/6903)] optimize `tableMeta` cache scheduled refresh issue +- [[#7001](https://github.com/seata/seata/pull/7001)] optimize transaction metrics - [[#7002](https://github.com/apache/incubator-seata/pull/7002)] optimize lock release logic in AT transaction mode ### security: diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md index b18916ec8d9..ea9eb7fa0f7 100644 --- a/changes/zh-cn/develop.md +++ b/changes/zh-cn/develop.md @@ -23,8 +23,10 @@ - [[#6044](https://github.com/seata/seata/pull/6044)] 优化MySQL衍生数据库判断逻辑 - [[#6361](https://github.com/seata/seata/pull/6361)] 优化部分链接 401 的问题 - [[#6903](https://github.com/apache/incubator-seata/pull/6903)] 优化`tableMeta`缓存定时刷新问题 +- [[#7001](https://github.com/apache/incubator-seata/pull/7001)] 优化 metrics 指标 - [[#7002](https://github.com/apache/incubator-seata/pull/7002)] 优化 AT 事务模式锁释放逻辑 + ### security: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述 diff --git a/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java b/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java index 29d55af24cc..da19451550c 100644 --- a/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java +++ b/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java @@ -85,7 +85,7 @@ public interface MeterIdConstants { .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER) .withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_COMMITTED); - Id TIMER_ROLLBACK = new Id(IdConstants.SEATA_TRANSACTION) + Id TIMER_ROLLBACKED = new Id(IdConstants.SEATA_TRANSACTION) .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER) .withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_ROLLBACKED); diff --git a/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java b/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java index 4c48f802f11..9e477da596b 100644 --- a/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java +++ b/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java @@ -23,6 +23,7 @@ import com.google.common.eventbus.Subscribe; import io.seata.core.event.GlobalTransactionEvent; import io.seata.core.model.GlobalStatus; +import io.seata.metrics.Id; import io.seata.metrics.registry.Registry; import io.seata.server.event.EventBusManager; import org.slf4j.Logger; @@ -47,21 +48,47 @@ public class MetricsSubscriber { public MetricsSubscriber(Registry registry) { this.registry = registry; - consumers = new HashMap<>(); - consumers.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin); - consumers.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted); - consumers.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked); + this.consumers = initializeConsumers(); + } + + private Map> initializeConsumers() { + Map> consumerMap = new HashMap<>(); + consumerMap.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin); + consumerMap.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted); + consumerMap.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked); + + consumerMap.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed); + consumerMap.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed); + consumerMap.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked); + consumerMap.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed); - consumers.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed); - consumers.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed); - consumers.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked); - consumers.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed); + consumerMap.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout); + consumerMap.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout); - consumers.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout); - consumers.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout); + consumerMap.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted); + consumerMap.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked); + return consumerMap; + } + + private void increaseCounter(Id counterId, GlobalTransactionEvent event) { + registry.getCounter( + counterId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(1); + } - consumers.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted); - consumers.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked); + private void decreaseCounter(Id counterId, GlobalTransactionEvent event) { + registry.getCounter( + counterId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).decrease(1); + } + + private void increaseSummary(Id summaryId, GlobalTransactionEvent event, long value) { + registry.getSummary( + summaryId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase( + value); + } + + private void increaseTimer(Id timerId, GlobalTransactionEvent event) { + registry.getTimer(timerId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())) + .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); } private void processGlobalStatusBegin(GlobalTransactionEvent event) { @@ -71,128 +98,86 @@ private void processGlobalStatusBegin(GlobalTransactionEvent event) { LOGGER.debug("subscribe:{},threadName:{}", object.toString(), Thread.currentThread().getName()); } } - registry.getCounter(MeterIdConstants.COUNTER_ACTIVE.withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); + increaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); } private void processGlobalStatusCommitted(GlobalTransactionEvent event) { if (event.isRetryGlobal()) { return; } - decreaseActive(event); - registry.getCounter(MeterIdConstants.COUNTER_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getSummary(MeterIdConstants.SUMMARY_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getTimer(MeterIdConstants.TIMER_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())) - .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); + increaseCounter(MeterIdConstants.COUNTER_COMMITTED, event); + increaseSummary(MeterIdConstants.SUMMARY_COMMITTED, event, 1); + increaseTimer(MeterIdConstants.TIMER_COMMITTED, event); } private void processGlobalStatusRollbacked(GlobalTransactionEvent event) { if (event.isRetryGlobal()) { return; } - decreaseActive(event); - registry.getCounter(MeterIdConstants.COUNTER_ROLLBACKED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getSummary(MeterIdConstants.SUMMARY_ROLLBACKED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getTimer(MeterIdConstants.TIMER_ROLLBACK - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())) - .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); + increaseCounter(MeterIdConstants.COUNTER_ROLLBACKED, event); + increaseSummary(MeterIdConstants.SUMMARY_ROLLBACKED, event, 1); + increaseTimer(MeterIdConstants.TIMER_ROLLBACKED, event); } private void processAfterGlobalRollbacked(GlobalTransactionEvent event) { if (event.isRetryGlobal() && event.isRetryBranch()) { - decreaseActive(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); } - registry.getCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())) - .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); + increaseCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED, event); + increaseSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED, event, 1); + increaseTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED, event); } private void processAfterGlobalCommitted(GlobalTransactionEvent event) { if (event.isRetryGlobal() && event.isRetryBranch()) { - decreaseActive(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); } - registry.getCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getTimer(MeterIdConstants.TIMER_AFTER_COMMITTED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())) - .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); + increaseCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED, event); + increaseSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED, event, 1); + increaseTimer(MeterIdConstants.TIMER_AFTER_COMMITTED, event); } private void processGlobalStatusCommitFailed(GlobalTransactionEvent event) { - decreaseActive(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); reportFailed(event); } private void processGlobalStatusRollbackFailed(GlobalTransactionEvent event) { - decreaseActive(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); reportFailed(event); } private void processGlobalStatusTimeoutRollbacked(GlobalTransactionEvent event) { - decreaseActive(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); } private void processGlobalStatusTimeoutRollbackFailed(GlobalTransactionEvent event) { - decreaseActive(event); - reportTwoPhaseTimeout(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); + increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1); + reportFailed(event); } private void processGlobalStatusCommitRetryTimeout(GlobalTransactionEvent event) { - decreaseActive(event); - reportTwoPhaseTimeout(event); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); + increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1); + //The phase 2 retry timeout state should be considered a transaction failed + reportFailed(event); } private void processGlobalStatusTimeoutRollbackRetryTimeout(GlobalTransactionEvent event) { - decreaseActive(event); - } - - private void decreaseActive(GlobalTransactionEvent event) { - registry.getCounter(MeterIdConstants.COUNTER_ACTIVE - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).decrease(1); + decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event); + increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1); + //The phase 2 retry timeout state should be considered a transaction failed + reportFailed(event); } private void reportFailed(GlobalTransactionEvent event) { - registry.getSummary(MeterIdConstants.SUMMARY_FAILED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); - registry.getTimer(MeterIdConstants.TIMER_FAILED - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())) - .record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS); - } - - private void reportTwoPhaseTimeout(GlobalTransactionEvent event) { - registry.getSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT - .withTag(APP_ID_KEY, event.getApplicationId()) - .withTag(GROUP_KEY, event.getGroup())).increase(1); + increaseSummary(MeterIdConstants.SUMMARY_FAILED, event, 1); + increaseTimer(MeterIdConstants.TIMER_FAILED, event); } - - @Subscribe public void recordGlobalTransactionEventForMetrics(GlobalTransactionEvent event) { if (registry != null && consumers.containsKey(event.getStatus())) { @@ -208,6 +193,7 @@ public boolean equals(Object obj) { /** * PMD check * SuppressWarnings("checkstyle:EqualsHashCode") + * * @return the hash code */ @Override diff --git a/server/src/main/java/io/seata/server/session/GlobalSession.java b/server/src/main/java/io/seata/server/session/GlobalSession.java index 29b71e80832..df2805515e7 100644 --- a/server/src/main/java/io/seata/server/session/GlobalSession.java +++ b/server/src/main/java/io/seata/server/session/GlobalSession.java @@ -760,7 +760,7 @@ public void queueToRetryCommit() throws TransactionException { public void queueToRetryRollback() throws TransactionException { this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager()); GlobalStatus currentStatus = this.getStatus(); - if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) { + if (GlobalStatus.TimeoutRollbacking == currentStatus) { this.setStatus(GlobalStatus.TimeoutRollbackRetrying); } else { this.setStatus(GlobalStatus.RollbackRetrying); diff --git a/server/src/main/java/io/seata/server/session/SessionHelper.java b/server/src/main/java/io/seata/server/session/SessionHelper.java index cf4d91bf638..d3d49464c89 100644 --- a/server/src/main/java/io/seata/server/session/SessionHelper.java +++ b/server/src/main/java/io/seata/server/session/SessionHelper.java @@ -193,7 +193,8 @@ public static void endRollbacked(GlobalSession globalSession, boolean retryGloba } boolean retryBranch = currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying; - if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) { + if (!currentStatus.equals(GlobalStatus.TimeoutRollbacked) + && SessionStatusValidator.isTimeoutRollbacking(currentStatus)) { globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked); } else { globalSession.changeGlobalStatus(GlobalStatus.Rollbacked); @@ -236,7 +237,7 @@ public static void endRollbackFailed(GlobalSession globalSession, boolean retryG GlobalStatus currentStatus = globalSession.getStatus(); if (isRetryTimeout) { globalSession.changeGlobalStatus(GlobalStatus.RollbackRetryTimeout); - } else if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) { + } else if (SessionStatusValidator.isTimeoutRollbacking(currentStatus)) { globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed); } else { globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed); diff --git a/server/src/main/java/io/seata/server/session/SessionStatusValidator.java b/server/src/main/java/io/seata/server/session/SessionStatusValidator.java index 20323a7e7bf..860f313af12 100644 --- a/server/src/main/java/io/seata/server/session/SessionStatusValidator.java +++ b/server/src/main/java/io/seata/server/session/SessionStatusValidator.java @@ -36,6 +36,11 @@ public static boolean isTimeoutGlobalStatus(GlobalStatus status) { || status == GlobalStatus.TimeoutRollbackRetrying; } + public static boolean isTimeoutRollbacking(GlobalStatus status) { + return status == GlobalStatus.TimeoutRollbacking + || status == GlobalStatus.TimeoutRollbackRetrying; + } + /** * is rollback global status *