Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: optimize transaction metrics #7001

Merged
merged 3 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#6044](https://github.com/seata/seata/pull/6044)] optimize derivative product check base on mysql
- [[#6361](https://github.com/seata/seata/pull/6361)] optimize 401 issues for some links
- [[#6903](https://github.com/apache/incubator-seata/pull/6903)] optimize `tableMeta` cache scheduled refresh issue
- [[#7001](https://github.com/seata/seata/pull/7001)] optimize transaction metrics

### security:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [[#6044](https://github.com/seata/seata/pull/6044)] 优化MySQL衍生数据库判断逻辑
- [[#6361](https://github.com/seata/seata/pull/6361)] 优化部分链接 401 的问题
- [[#6903](https://github.com/apache/incubator-seata/pull/6903)] 优化`tableMeta`缓存定时刷新问题
- [[#7001](https://github.com/apache/incubator-seata/pull/7001)] 优化 metrics 指标

### security:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public interface MeterIdConstants {
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_COMMITTED);

Id TIMER_ROLLBACK = new Id(IdConstants.SEATA_TRANSACTION)
Id TIMER_ROLLBACKED = new Id(IdConstants.SEATA_TRANSACTION)
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_ROLLBACKED);
Expand Down
160 changes: 73 additions & 87 deletions server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.eventbus.Subscribe;
import io.seata.core.event.GlobalTransactionEvent;
import io.seata.core.model.GlobalStatus;
import io.seata.metrics.Id;
import io.seata.metrics.registry.Registry;
import io.seata.server.event.EventBusManager;
import org.slf4j.Logger;
Expand All @@ -47,21 +48,47 @@ public class MetricsSubscriber {

public MetricsSubscriber(Registry registry) {
this.registry = registry;
consumers = new HashMap<>();
consumers.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin);
consumers.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted);
consumers.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked);
this.consumers = initializeConsumers();
}

private Map<String, Consumer<GlobalTransactionEvent>> initializeConsumers() {
Map<String, Consumer<GlobalTransactionEvent>> consumerMap = new HashMap<>();
consumerMap.put(GlobalStatus.Begin.name(), this::processGlobalStatusBegin);
consumerMap.put(GlobalStatus.Committed.name(), this::processGlobalStatusCommitted);
consumerMap.put(GlobalStatus.Rollbacked.name(), this::processGlobalStatusRollbacked);

consumerMap.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed);
consumerMap.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed);
consumerMap.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked);
consumerMap.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed);

consumers.put(GlobalStatus.CommitFailed.name(), this::processGlobalStatusCommitFailed);
consumers.put(GlobalStatus.RollbackFailed.name(), this::processGlobalStatusRollbackFailed);
consumers.put(GlobalStatus.TimeoutRollbacked.name(), this::processGlobalStatusTimeoutRollbacked);
consumers.put(GlobalStatus.TimeoutRollbackFailed.name(), this::processGlobalStatusTimeoutRollbackFailed);
consumerMap.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout);
consumerMap.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout);

consumers.put(GlobalStatus.CommitRetryTimeout.name(), this::processGlobalStatusCommitRetryTimeout);
consumers.put(GlobalStatus.RollbackRetryTimeout.name(), this::processGlobalStatusTimeoutRollbackRetryTimeout);
consumerMap.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted);
consumerMap.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked);
return consumerMap;
}

private void increaseCounter(Id counterId, GlobalTransactionEvent event) {
registry.getCounter(
counterId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(1);
}

consumers.put(STATUS_VALUE_AFTER_COMMITTED_KEY, this::processAfterGlobalCommitted);
consumers.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY, this::processAfterGlobalRollbacked);
private void decreaseCounter(Id counterId, GlobalTransactionEvent event) {
registry.getCounter(
counterId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).decrease(1);
}

private void increaseSummary(Id summaryId, GlobalTransactionEvent event, long value) {
registry.getSummary(
summaryId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(
value);
}

private void increaseTimer(Id timerId, GlobalTransactionEvent event) {
registry.getTimer(timerId.withTag(APP_ID_KEY, event.getApplicationId()).withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
}

private void processGlobalStatusBegin(GlobalTransactionEvent event) {
Expand All @@ -71,128 +98,86 @@ private void processGlobalStatusBegin(GlobalTransactionEvent event) {
LOGGER.debug("subscribe:{},threadName:{}", object.toString(), Thread.currentThread().getName());
}
}
registry.getCounter(MeterIdConstants.COUNTER_ACTIVE.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
increaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}

private void processGlobalStatusCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
decreaseActive(event);
registry.getCounter(MeterIdConstants.COUNTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseCounter(MeterIdConstants.COUNTER_COMMITTED, event);
increaseSummary(MeterIdConstants.SUMMARY_COMMITTED, event, 1);
increaseTimer(MeterIdConstants.TIMER_COMMITTED, event);
}

private void processGlobalStatusRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
decreaseActive(event);
registry.getCounter(MeterIdConstants.COUNTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_ROLLBACK
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseCounter(MeterIdConstants.COUNTER_ROLLBACKED, event);
increaseSummary(MeterIdConstants.SUMMARY_ROLLBACKED, event, 1);
increaseTimer(MeterIdConstants.TIMER_ROLLBACKED, event);
}

private void processAfterGlobalRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
registry.getCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
increaseCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED, event);
increaseSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED, event, 1);
increaseTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED, event);
}

private void processAfterGlobalCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
registry.getCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_AFTER_COMMITTED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
increaseCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED, event);
increaseSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED, event, 1);
increaseTimer(MeterIdConstants.TIMER_AFTER_COMMITTED, event);
}

private void processGlobalStatusCommitFailed(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}

private void processGlobalStatusRollbackFailed(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}

private void processGlobalStatusTimeoutRollbacked(GlobalTransactionEvent event) {
decreaseActive(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}

private void processGlobalStatusTimeoutRollbackFailed(GlobalTransactionEvent event) {
decreaseActive(event);
reportTwoPhaseTimeout(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
reportFailed(event);
}

private void processGlobalStatusCommitRetryTimeout(GlobalTransactionEvent event) {
decreaseActive(event);
reportTwoPhaseTimeout(event);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
//The phase 2 retry timeout state should be considered a transaction failed
reportFailed(event);
}

private void processGlobalStatusTimeoutRollbackRetryTimeout(GlobalTransactionEvent event) {
decreaseActive(event);
}

private void decreaseActive(GlobalTransactionEvent event) {
registry.getCounter(MeterIdConstants.COUNTER_ACTIVE
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).decrease(1);
decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
//The phase 2 retry timeout state should be considered a transaction failed
reportFailed(event);
}

private void reportFailed(GlobalTransactionEvent event) {
registry.getSummary(MeterIdConstants.SUMMARY_FAILED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
registry.getTimer(MeterIdConstants.TIMER_FAILED
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup()))
.record(event.getEndTime() - event.getBeginTime(), TimeUnit.MILLISECONDS);
}

private void reportTwoPhaseTimeout(GlobalTransactionEvent event) {
registry.getSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT
.withTag(APP_ID_KEY, event.getApplicationId())
.withTag(GROUP_KEY, event.getGroup())).increase(1);
increaseSummary(MeterIdConstants.SUMMARY_FAILED, event, 1);
increaseTimer(MeterIdConstants.TIMER_FAILED, event);
}



@Subscribe
public void recordGlobalTransactionEventForMetrics(GlobalTransactionEvent event) {
if (registry != null && consumers.containsKey(event.getStatus())) {
Expand All @@ -208,6 +193,7 @@ public boolean equals(Object obj) {
/**
* PMD check
* SuppressWarnings("checkstyle:EqualsHashCode")
*
* @return the hash code
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ public void queueToRetryCommit() throws TransactionException {
public void queueToRetryRollback() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
GlobalStatus currentStatus = this.getStatus();
if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
if (GlobalStatus.TimeoutRollbacking == currentStatus) {
this.setStatus(GlobalStatus.TimeoutRollbackRetrying);
} else {
this.setStatus(GlobalStatus.RollbackRetrying);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public static void endRollbacked(GlobalSession globalSession, boolean retryGloba
}
boolean retryBranch =
currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;
if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
if (!currentStatus.equals(GlobalStatus.TimeoutRollbacked)
&& SessionStatusValidator.isTimeoutRollbacking(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
Expand Down Expand Up @@ -236,7 +237,7 @@ public static void endRollbackFailed(GlobalSession globalSession, boolean retryG
GlobalStatus currentStatus = globalSession.getStatus();
if (isRetryTimeout) {
globalSession.changeGlobalStatus(GlobalStatus.RollbackRetryTimeout);
} else if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
} else if (SessionStatusValidator.isTimeoutRollbacking(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
|| status == GlobalStatus.TimeoutRollbackRetrying;
}

public static boolean isTimeoutRollbacking(GlobalStatus status) {
return status == GlobalStatus.TimeoutRollbacking
|| status == GlobalStatus.TimeoutRollbackRetrying;
}

/**
* is rollback global status
*
Expand Down
Loading