From e152f8bae455120130366923a630adb200cccae5 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 26 Jun 2024 17:53:11 +0800 Subject: [PATCH] [fix](oom) avoid oom when a lot of tablets fail on load --- be/src/agent/heartbeat_server.cpp | 7 +- .../transaction/DatabaseTransactionMgr.java | 65 ++++++++++++++----- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index d27145c3b0eaea..17282910167b8f 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -93,7 +93,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, } watch.stop(); if (watch.elapsed_time() > 1000L * 1000L * 1000L) { - LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time(); + LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time() + << ", host:" << master_info.network_address.hostname + << ", port:" << master_info.network_address.port + << ", cluster id:" << master_info.cluster_id + << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos) + << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 2042003d5ccc5b..3850400c1b7900 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1334,7 +1334,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat List tabletSuccReplicas = Lists.newArrayList(); List tabletWriteFailedReplicas = Lists.newArrayList(); List tabletVersionFailedReplicas = Lists.newArrayList(); - List logs = Lists.newArrayList(); + TabletsPublishResultLogs logs = new TabletsPublishResultLogs(); Map> publishTasks = transactionState.getPublishVersionTasks(); PublishResult publishResult = PublishResult.QUORUM_SUCC; @@ -1396,10 +1396,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L; if (needLog) { transactionState.setLastPublishLogTime(now); - for (String log : logs) { - LOG.info("{}. publish times {}, whole txn publish result {}", - log, transactionState.getPublishCount(), publishResult.name()); - } + logs.log(); } return publishResult; @@ -2607,7 +2604,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { allowPublishOneSucc = true; } - List logs = Lists.newArrayList(); + TabletsPublishResultLogs logs = new TabletsPublishResultLogs(); Map> publishTasks = transactionState.getPublishVersionTasks(); PublishResult publishResult = PublishResult.QUORUM_SUCC; @@ -2733,19 +2730,55 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat || LOG.isDebugEnabled(); if (needLog) { transactionState.setLastPublishLogTime(now); - for (String log : logs) { - LOG.info("{}. publish times {}, whole txn publish result {}", - log, transactionState.getPublishCount(), publishResult.name()); - } + logs.log(); } return publishResult; } + private class TabletsPublishResultLogs { + public List quorumSuccLogs = Lists.newArrayList(); + public List timeoutSuccLogs = Lists.newArrayList(); + public List failedLogs = Lists.newArrayList(); + + public void addQuorumSuccLog(String log) { + if (quorumSuccLogs.size() < 16) { + quorumSuccLogs.add(log); + } + } + + public void addTimeoutSuccLog(String log) { + if (timeoutSuccLogs.size() < 16) { + timeoutSuccLogs.add(log); + } + } + + public void addFailedLog(String log) { + if (failedLogs.size() < 16) { + failedLogs.add(log); + } + } + + public void log() { + // log failed logs + for (String log : failedLogs) { + LOG.info(log); + } + // log timeout succ logs + for (String log : timeoutSuccLogs) { + LOG.info(log); + } + // log quorum succ logs + for (String log : quorumSuccLogs) { + LOG.info(log); + } + } + } + private PublishResult checkQuorumReplicas(TransactionState transactionState, long tableId, Partition partition, Tablet tablet, int loadRequiredReplicaNum, boolean allowPublishOneSucc, long newVersion, List tabletSuccReplicas, List tabletWriteFailedReplicas, - List tabletVersionFailedReplicas, PublishResult publishResult, List logs) { + List tabletVersionFailedReplicas, PublishResult publishResult, TabletsPublishResultLogs logs) { long partitionId = partition.getId(); int healthReplicaNum = tabletSuccReplicas.size(); if (healthReplicaNum >= loadRequiredReplicaNum) { @@ -2753,11 +2786,11 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon if (hasFailedReplica) { String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); - logs.add(String.format("publish version quorum succ for transaction %s on tablet %s" + logs.addQuorumSuccLog(String.format("publish version quorum succ for transaction %s on tablet %s" + " with version %s, and has failed replicas, load require replica num %s. " + "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s", - transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId, - partition.getCommittedVersion(), writeDetail)); + transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, + partitionId, partition.getCommittedVersion(), writeDetail)); } return publishResult; } @@ -2777,7 +2810,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon // that are being publised exists on a few replicas we should go // ahead, otherwise data may be lost and thre // publish task hangs forever. - logs.add(String.format("publish version timeout succ for transaction %s on tablet %s " + logs.addTimeoutSuccLog(String.format("publish version timeout succ for transaction %s on tablet %s " + "with version %s, and has failed replicas, load require replica num %s. " + "table %s, partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId, writeDetail)); @@ -2788,7 +2821,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon + " table: %d, partition: %d, publish version: %d", tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, partitionId, newVersion); transactionState.setErrorMsg(errMsg); - logs.add(String.format("publish version failed for transaction %s on tablet %s with version" + logs.addFailedLog(String.format("publish version failed for transaction %s on tablet %s with version" + " %s, and has failed replicas, load required replica num %s. table %s, " + "partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId, writeDetail));