Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jan 2, 2024
1 parent c623f36 commit 64c2f40
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");
// This config can be set to limit thread number in compaction thread pool.
DEFINE_mInt32(max_base_compaction_threads, "-1");
DEFINE_mInt32(max_cumu_compaction_threads, "-1");
DEFINE_mInt32(max_single_replica_compaction_threads, "10");
DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
Expand Down
55 changes: 35 additions & 20 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
return threads_num;
}

static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_single_replica_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}

Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
Expand Down Expand Up @@ -138,6 +147,8 @@ Status StorageEngine::start_bg_threads() {

auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
auto single_replica_compaction_threads = get_single_replica_compaction_threads_num(
data_dirs.size());

RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
Expand All @@ -148,8 +159,8 @@ Status StorageEngine::start_bg_threads() {
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(config::max_single_replica_compaction_threads)
.set_max_threads(config::max_single_replica_compaction_threads)
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
Expand Down Expand Up @@ -479,64 +490,68 @@ void StorageEngine::_tablet_path_check_callback() {
}

void StorageEngine::_adjust_compaction_thread_num() {
if (_base_compaction_thread_pool->max_threads() != config::max_base_compaction_threads) {
auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
int old_max_threads = _base_compaction_thread_pool->max_threads();
Status status =
_base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads);
_base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
<< " to " << config::max_base_compaction_threads;
<< " to " << base_compaction_threads_num;
}
}
if (_base_compaction_thread_pool->min_threads() != config::max_base_compaction_threads) {
if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
int old_min_threads = _base_compaction_thread_pool->min_threads();
Status status =
_base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads);
_base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
<< " to " << config::max_base_compaction_threads;
<< " to " << base_compaction_threads_num;
}
}

if (_cumu_compaction_thread_pool->max_threads() != config::max_cumu_compaction_threads) {
auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
int old_max_threads = _cumu_compaction_thread_pool->max_threads();
Status status =
_cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads);
_cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
<< " to " << config::max_cumu_compaction_threads;
<< " to " << cumu_compaction_threads_num;
}
}
if (_cumu_compaction_thread_pool->min_threads() != config::max_cumu_compaction_threads) {
if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
int old_min_threads = _cumu_compaction_thread_pool->min_threads();
Status status =
_cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads);
_cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
<< " to " << config::max_cumu_compaction_threads;
<< " to " << cumu_compaction_threads_num;
}
}

auto single_replica_compaction_threads_num =
get_single_replica_compaction_threads_num(_store_map.size());
if (_single_replica_compaction_thread_pool->max_threads() !=
config::max_single_replica_compaction_threads) {
single_replica_compaction_threads_num) {
int old_max_threads = _single_replica_compaction_thread_pool->max_threads();
Status status = _single_replica_compaction_thread_pool->set_max_threads(
config::max_single_replica_compaction_threads);
single_replica_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update single replica compaction thread pool max_threads from "
<< old_max_threads << " to "
<< config::max_single_replica_compaction_threads;
<< single_replica_compaction_threads_num;
}
}
if (_single_replica_compaction_thread_pool->min_threads() !=
config::max_single_replica_compaction_threads) {
single_replica_compaction_threads_num) {
int old_min_threads = _single_replica_compaction_thread_pool->min_threads();
Status status = _single_replica_compaction_thread_pool->set_min_threads(
config::max_single_replica_compaction_threads);
single_replica_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update single replica compaction thread pool min_threads from "
<< old_min_threads << " to "
<< config::max_single_replica_compaction_threads;
<< single_replica_compaction_threads_num;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/en/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,8 @@ BaseCompaction:546859:
#### `max_single_replica_compaction_threads`

* Type: int32
* Description: The maximum of thread number in single replica compaction thread pool.
* Default value: 10
* Description: The maximum of thread number in single replica compaction thread pool. -1 means one thread per disk.
* Default value: -1

#### `update_replica_infos_interval_seconds`

Expand Down
4 changes: 2 additions & 2 deletions docs/zh-CN/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ BaseCompaction:546859:
#### `max_single_replica_compaction_threads`

* 类型:int32
* 描述:Single Replica Compaction 线程池中线程数量的最大值。
* 默认值:10
* 描述:Single Replica Compaction 线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:-1

#### `update_replica_infos_interval_seconds`

Expand Down

0 comments on commit 64c2f40

Please sign in to comment.