Skip to content

Commit

Permalink
(compaction) compaction threads is 1 for each disk
Browse files Browse the repository at this point in the history
We start 1 cumu and base compaction thread each disk.
  • Loading branch information
dataroaring committed Jan 2, 2024
1 parent f2fa62f commit c623f36
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 16 deletions.
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
tablet->published_count.fetch_add(1);
int64_t published_count = tablet->published_count.load();
if (published_count % 10 == 0) {
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
if (!st.ok()) [[unlikely]] {
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ DEFINE_mBool(enable_vertical_segment_writer, "true");
DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");

// This config can be set to limit thread number in compaction thread pool.
DEFINE_mInt32(max_base_compaction_threads, "4");
DEFINE_mInt32(max_cumu_compaction_threads, "10");
DEFINE_mInt32(max_base_compaction_threads, "-1");
DEFINE_mInt32(max_cumu_compaction_threads, "-1");
DEFINE_mInt32(max_single_replica_compaction_threads, "10");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
Expand Down
29 changes: 25 additions & 4 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,24 @@ volatile uint32_t g_schema_change_active_threads = 0;
static const uint64_t DEFAULT_SEED = 104729;
static const uint64_t MOD_PRIME = 7652413;

static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_cumu_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}

static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_base_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}

Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
Expand All @@ -118,13 +136,16 @@ Status StorageEngine::start_bg_threads() {
data_dirs.push_back(tmp_store.second);
}

auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());

RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(config::max_base_compaction_threads)
.set_max_threads(config::max_base_compaction_threads)
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(config::max_cumu_compaction_threads)
.set_max_threads(config::max_cumu_compaction_threads)
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(config::max_single_replica_compaction_threads)
Expand Down
10 changes: 5 additions & 5 deletions docs/en/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,10 @@ There are two ways to configure BE configuration items:
* Default value: 10485760

#### `max_base_compaction_threads`

git
* Type: int32
* Description: The maximum of thread number in base compaction thread pool.
* Default value: 4
* Description: The maximum of thread number in base compaction thread pool, -1 means one thread per disk.
* Default value: -1

#### `generate_compaction_tasks_interval_ms`

Expand Down Expand Up @@ -625,8 +625,8 @@ BaseCompaction:546859:
#### `max_cumu_compaction_threads`

* Type: int32
* Description: The maximum of thread number in cumulative compaction thread pool.
* Default value: 10
* Description: The maximum of thread number in cumulative compaction thread pool, -1 means one thread per disk.
* Default value: -1

#### `enable_segcompaction`

Expand Down
8 changes: 4 additions & 4 deletions docs/zh-CN/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下
#### `max_base_compaction_threads`

* 类型:int32
* 描述:Base Compaction线程池中线程数量的最大值。
* 默认值:4
* 描述:Base Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:-1

#### `generate_compaction_tasks_interval_ms`

Expand Down Expand Up @@ -639,8 +639,8 @@ BaseCompaction:546859:
#### `max_cumu_compaction_threads`

* 类型:int32
* 描述:Cumulative Compaction线程池中线程数量的最大值。
* 默认值:10
* 描述:Cumulative Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:-1

#### `enable_segcompaction`

Expand Down

0 comments on commit c623f36

Please sign in to comment.