Skip to content

Commit

Permalink
[fix](oom) avoid oom when a lot of tablets fail on load
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jun 26, 2024
1 parent f0b7422 commit e152f8b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
7 changes: 6 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time();
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time()
<< ", host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
List<String> logs = Lists.newArrayList();
TabletsPublishResultLogs logs = new TabletsPublishResultLogs();

Map<Long, List<PublishVersionTask>> publishTasks = transactionState.getPublishVersionTasks();
PublishResult publishResult = PublishResult.QUORUM_SUCC;
Expand Down Expand Up @@ -1396,10 +1396,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
|| now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
if (needLog) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
LOG.info("{}. publish times {}, whole txn publish result {}",
log, transactionState.getPublishCount(), publishResult.name());
}
logs.log();
}

return publishResult;
Expand Down Expand Up @@ -2607,7 +2604,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
allowPublishOneSucc = true;
}
List<String> logs = Lists.newArrayList();
TabletsPublishResultLogs logs = new TabletsPublishResultLogs();

Map<Long, List<PublishVersionTask>> publishTasks = transactionState.getPublishVersionTasks();
PublishResult publishResult = PublishResult.QUORUM_SUCC;
Expand Down Expand Up @@ -2733,31 +2730,67 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
|| LOG.isDebugEnabled();
if (needLog) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
LOG.info("{}. publish times {}, whole txn publish result {}",
log, transactionState.getPublishCount(), publishResult.name());
}
logs.log();
}

return publishResult;
}

private class TabletsPublishResultLogs {
public List<String> quorumSuccLogs = Lists.newArrayList();
public List<String> timeoutSuccLogs = Lists.newArrayList();
public List<String> failedLogs = Lists.newArrayList();

public void addQuorumSuccLog(String log) {
if (quorumSuccLogs.size() < 16) {
quorumSuccLogs.add(log);
}
}

public void addTimeoutSuccLog(String log) {
if (timeoutSuccLogs.size() < 16) {
timeoutSuccLogs.add(log);
}
}

public void addFailedLog(String log) {
if (failedLogs.size() < 16) {
failedLogs.add(log);
}
}

public void log() {
// log failed logs
for (String log : failedLogs) {
LOG.info(log);
}
// log timeout succ logs
for (String log : timeoutSuccLogs) {
LOG.info(log);
}
// log quorum succ logs
for (String log : quorumSuccLogs) {
LOG.info(log);
}
}
}

private PublishResult checkQuorumReplicas(TransactionState transactionState, long tableId, Partition partition,
Tablet tablet, int loadRequiredReplicaNum, boolean allowPublishOneSucc, long newVersion,
List<Replica> tabletSuccReplicas, List<Replica> tabletWriteFailedReplicas,
List<Replica> tabletVersionFailedReplicas, PublishResult publishResult, List<String> logs) {
List<Replica> tabletVersionFailedReplicas, PublishResult publishResult, TabletsPublishResultLogs logs) {
long partitionId = partition.getId();
int healthReplicaNum = tabletSuccReplicas.size();
if (healthReplicaNum >= loadRequiredReplicaNum) {
boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty();
if (hasFailedReplica) {
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
logs.addQuorumSuccLog(String.format("publish version quorum succ for transaction %s on tablet %s"
+ " with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s",
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId,
partition.getCommittedVersion(), writeDetail));
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId,
partitionId, partition.getCommittedVersion(), writeDetail));
}
return publishResult;
}
Expand All @@ -2777,7 +2810,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon
// that are being publised exists on a few replicas we should go
// ahead, otherwise data may be lost and thre
// publish task hangs forever.
logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
logs.addTimeoutSuccLog(String.format("publish version timeout succ for transaction %s on tablet %s "
+ "with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
Expand All @@ -2788,7 +2821,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon
+ " table: %d, partition: %d, publish version: %d", tablet.getId(), healthReplicaNum,
loadRequiredReplicaNum, tableId, partitionId, newVersion);
transactionState.setErrorMsg(errMsg);
logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
logs.addFailedLog(String.format("publish version failed for transaction %s on tablet %s with version"
+ " %s, and has failed replicas, load required replica num %s. table %s, "
+ "partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
Expand Down

0 comments on commit e152f8b

Please sign in to comment.