Skip to content

Commit

Permalink
[branch-3.0][feat](lock)add deadlock detection tool and monitored loc…
Browse files Browse the repository at this point in the history
…k implementations (apache#39015) #
  • Loading branch information
CalvinKirs committed Oct 11, 2024
1 parent a1f8383 commit b55f080
Show file tree
Hide file tree
Showing 24 changed files with 490 additions and 124 deletions.
13 changes: 13 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3093,4 +3093,17 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {
"设置为 true,如果查询无法选择到健康副本时,会打印出该tablet所有副本的详细信息,"})
public static boolean sql_block_rule_ignore_admin = false;
//==========================================================================
// start of lock config
@ConfField(description = {"是否开启死锁检测",
"Whether to enable deadlock detection"})
public static boolean enable_deadlock_detection = false;

@ConfField(description = {"死锁检测间隔时间,单位分钟",
"Deadlock detection interval time, unit minute"})
public static long deadlock_detection_interval_minute = 5;

@ConfField(mutable = true, description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒",
"Maximum lock hold time; logs a warning if exceeded"})
public static long max_lock_hold_threshold_seconds = 10;
}
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.HttpServer;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;

public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
Expand Down Expand Up @@ -95,6 +97,13 @@ public static void main(String[] args) {
start(DORIS_HOME_DIR, PID_DIR, args, options);
}

private static void startMonitor() {
if (Config.enable_deadlock_detection) {
DeadlockMonitor deadlockMonitor = new DeadlockMonitor();
deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES);
}
}

// entrance for doris frontend
public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
Expand Down Expand Up @@ -214,7 +223,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
}

ThreadPoolManager.registerAllThreadPoolMetric();

startMonitor();
while (true) {
Thread.sleep(2000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -183,7 +183,7 @@ public static boolean isGlobalGroupName(String groupName) {
// save some error msg of the group for show. no need to persist
private Map<GroupId, String> group2ErrMsgs = Maps.newHashMap();

private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock();

public ColocateTableIndex() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.persist.CreateTableInfo;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>,
@SerializedName(value = "fullQualifiedName")
private volatile String fullQualifiedName;

private QueryableReentrantReadWriteLock rwLock;
private MonitoredReentrantReadWriteLock rwLock;

// table family group map
private final Map<Long, Table> idToTable;
Expand Down Expand Up @@ -138,7 +138,7 @@ public Database(long id, String name) {
if (this.fullQualifiedName == null) {
this.fullQualifiedName = "";
}
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.idToTable = Maps.newConcurrentMap();
this.nameToTable = Maps.newConcurrentMap();
this.lowerCaseToTableName = Maps.newConcurrentMap();
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
Expand All @@ -124,7 +125,6 @@
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -369,7 +369,7 @@ public class Env {
// We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass
// because fair lock has poor performance.
// Using QueryableReentrantLock to print owner thread in debug mode.
private QueryableReentrantLock lock;
private MonitoredReentrantLock lock;

private CatalogMgr catalogMgr;
private GlobalFunctionMgr globalFunctionMgr;
Expand Down Expand Up @@ -693,7 +693,7 @@ public Env(boolean isCheckpointCatalog) {
this.syncJobManager = new SyncJobManager();
this.alter = new Alter();
this.consistencyChecker = new ConsistencyChecker();
this.lock = new QueryableReentrantLock(true);
this.lock = new MonitoredReentrantLock(true);
this.backupHandler = new BackupHandler(this);
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
Expand All @@ -43,16 +44,17 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class StorageVaultMgr {
private static final Logger LOG = LogManager.getLogger(StorageVaultMgr.class);
private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = Executors.newFixedThreadPool(1);
private final SystemInfoService systemInfoService;
// <VaultName, VaultId>
private Pair<String, String> defaultVaultInfo;

private Map<String, String> vaultNameToVaultId = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

private MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock();

public StorageVaultMgr(SystemInfoService systemInfoService) {
this.systemInfoService = systemInfoService;
Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -58,7 +59,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
Expand All @@ -83,11 +83,11 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso
protected TableType type;
@SerializedName(value = "createTime")
protected long createTime;
protected QueryableReentrantReadWriteLock rwLock;
protected MonitoredReentrantReadWriteLock rwLock;
// Used for queuing commit transactifon tasks to avoid fdb transaction conflicts,
// especially to reduce conflicts when obtaining delete bitmap update locks for
// MoW table
protected ReentrantLock commitLock;
protected MonitoredReentrantLock commitLock;

/*
* fullSchema and nameToColumn should contains all columns, both visible and shadow.
Expand Down Expand Up @@ -133,11 +133,11 @@ public Table(TableType type) {
this.type = type;
this.fullSchema = Lists.newArrayList();
this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public Table(long id, String tableName, TableType type, List<Column> fullSchema) {
Expand All @@ -157,12 +157,12 @@ public Table(long id, String tableName, TableType type, List<Column> fullSchema)
// Only view in with-clause have null base
Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns");
}
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.createTime = Instant.now().getEpochSecond();
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public void markDropped() {
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand All @@ -51,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -128,7 +128,7 @@ public TabletHealth() {
private long cooldownReplicaId = -1;
@SerializedName(value = "ctm", alternate = {"cooldownTerm"})
private long cooldownTerm = -1;
private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock();

// last time that the tablet checker checks this tablet.
// no need to persist
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.lock;

import org.apache.doris.common.Config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract base class for a monitored lock that tracks lock acquisition,
* release, and attempt times. It provides mechanisms for monitoring the
* duration for which a lock is held and logging any instances where locks
* are held longer than a specified timeout or fail to be acquired within
* a specified timeout.
*/
public abstract class AbstractMonitoredLock {

private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class);

// Thread-local variable to store the lock start time
private final ThreadLocal<Long> lockStartTime = new ThreadLocal<>();


/**
* Method to be called after successfully acquiring the lock.
* Sets the start time for the lock.
*/
protected void afterLock() {
lockStartTime.set(System.nanoTime());
}

/**
* Method to be called after releasing the lock.
* Calculates the lock hold time and logs a warning if it exceeds the hold timeout.
*/
protected void afterUnlock() {
Long startTime = lockStartTime.get();
if (startTime != null) {
long lockHoldTimeNanos = System.nanoTime() - startTime;
long lockHoldTimeMs = lockHoldTimeNanos >> 20;
if (lockHoldTimeMs > Config.max_lock_hold_threshold_seconds * 1000) {
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms "
+ "Thread stack trace:{}",
currentThread.getId(), currentThread.getName(), lockHoldTimeMs, lockHoldTimeMs, stackTrace);
}
lockStartTime.remove();
}
}

/**
* Method to be called after attempting to acquire the lock using tryLock.
* Logs a warning if the lock was not acquired within a reasonable time.
*
* @param acquired Whether the lock was successfully acquired
* @param startTime The start time of the lock attempt
*/
protected void afterTryLock(boolean acquired, long startTime) {
if (acquired) {
afterLock();
return;
}
if (LOG.isDebugEnabled()) {
long elapsedTime = (System.nanoTime() - startTime) >> 20;
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms"
+ "\nThread blocking info:\n{}",
currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace);
}
}

/**
* Utility method to format the stack trace of a thread.
*
* @param stackTrace The stack trace elements of the thread
* @return A formatted string of the stack trace
*/
private String getThreadStackTrace(StackTraceElement[] stackTrace) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : stackTrace) {
sb.append("\tat ").append(element).append("\n");
}
return sb.toString().replace("\n", "\\n");
}
}


Loading

0 comments on commit b55f080

Please sign in to comment.