Skip to content

Commit

Permalink
[BugFix] Fix check bind_cpus for resource group (#50809)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <ziheliu1024@gmail.com>
  • Loading branch information
ZiheLiu authored Sep 12, 2024
1 parent 4bd744c commit e5153b7
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 12 deletions.
16 changes: 13 additions & 3 deletions be/src/exec/workgroup/pipeline_executor_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ namespace starrocks::workgroup {
// PipelineExecutorSetConfig
// ------------------------------------------------------------------------------------

static CpuUtil::CpuIds limit_total_cpuids(CpuUtil::CpuIds&& total_cpuids, uint32_t num_total_cores) {
if (total_cpuids.empty() || total_cpuids.size() <= num_total_cores) {
return std::move(total_cpuids);
}

CpuUtil::CpuIds cpuids;
std::copy_n(total_cpuids.begin(), num_total_cores, std::back_inserter(cpuids));
return cpuids;
}

PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, uint32_t num_total_driver_threads,
uint32_t num_total_scan_threads,
uint32_t num_total_connector_scan_threads,
Expand All @@ -35,9 +45,9 @@ PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, u
num_total_driver_threads(num_total_driver_threads),
num_total_scan_threads(num_total_scan_threads),
num_total_connector_scan_threads(num_total_connector_scan_threads),
total_cpuids(std::move(total_cpuids)),
total_cpuids(limit_total_cpuids(std::move(total_cpuids), num_total_cores)),
enable_bind_cpus(enable_bind_cpus),
enable_cpu_borrowing(enable_cpu_borrowing) {}
enable_cpu_borrowing(enable_cpu_borrowing && enable_bind_cpus) {}

std::string PipelineExecutorSetConfig::to_string() const {
return fmt::format(
Expand Down Expand Up @@ -172,7 +182,7 @@ void PipelineExecutorSet::notify_config_changed() const {
}

uint32_t PipelineExecutorSet::calculate_num_threads(uint32_t num_total_threads) const {
if (!_borrowed_cpu_ids.empty() || _cpuids.empty()) {
if (!_borrowed_cpu_ids.empty()) {
return num_total_threads;
}
return std::max<uint32_t>(1, num_total_threads * _cpuids.size() / _conf.num_total_cores);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/workgroup/pipeline_executor_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ struct PipelineExecutorSetConfig {
const uint32_t num_total_scan_threads;
uint32_t num_total_connector_scan_threads;

CpuUtil::CpuIds total_cpuids;
const CpuUtil::CpuIds total_cpuids;

bool enable_bind_cpus;
const bool enable_bind_cpus;
bool enable_cpu_borrowing;
};

Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/workgroup/pipeline_executor_set_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ void ExecutorsManager::change_num_connector_scan_threads(uint32_t num_connector_
}

void ExecutorsManager::change_enable_resource_group_cpu_borrowing(bool val) {
if (_conf.enable_cpu_borrowing == val) {
const bool new_val = val && _conf.enable_bind_cpus;
if (_conf.enable_cpu_borrowing == new_val) {
return;
}
_conf.enable_cpu_borrowing = val;
_conf.enable_cpu_borrowing = new_val;

update_shared_executors();
}
Expand Down
7 changes: 2 additions & 5 deletions be/src/http/action/pipeline_blocking_drivers_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,8 @@ void PipelineBlockingDriversAction::_handle_stat(HttpRequest* req) {
};

QueryMap query_map_in_wg;
_exec_env->workgroup_manager()->for_each_workgroup([&](const workgroup::WorkGroup& wg) {
if (wg.exclusive_executors() != nullptr) {
wg.exclusive_executors()->driver_executor()->iterate_immutable_blocking_driver(
iterate_func_generator(query_map_in_wg));
}
_exec_env->workgroup_manager()->for_each_executors([&](const workgroup::PipelineExecutorSet& executor) {
executor.driver_executor()->iterate_immutable_blocking_driver(iterate_func_generator(query_map_in_wg));
});
rapidjson::Document queries_in_wg_obj = query_map_to_doc_func(query_map_in_wg);

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
// Disable bind cpus when cgroup has cpu quota but no cpuset.
const bool enable_bind_cpus = config::enable_resource_group_bind_cpus &&
(!CpuInfo::is_cgroup_with_cpu_quota() || CpuInfo::is_cgroup_with_cpuset());
config::enable_resource_group_bind_cpus = enable_bind_cpus;
workgroup::PipelineExecutorSetConfig executors_manager_opts(
CpuInfo::num_cores(), _max_executor_threads, num_io_threads, connector_num_io_threads,
CpuInfo::get_core_ids(), enable_bind_cpus, config::enable_resource_group_cpu_borrowing);
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(EXEC_FILES
./exec/iceberg/iceberg_table_sink_operator_test.cpp
./exec/paimon/paimon_delete_file_builder_test.cpp
./exec/workgroup/scan_task_queue_test.cpp
./exec/workgroup/pipeline_executor_set_test.cpp
./exec/pipeline/pipeline_control_flow_test.cpp
./exec/pipeline/pipeline_driver_queue_test.cpp
./exec/pipeline/pipeline_file_scan_node_test.cpp
Expand Down
80 changes: 80 additions & 0 deletions be/test/exec/workgroup/pipeline_executor_set_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/workgroup/pipeline_executor_set.h"

#include <gtest/gtest.h>

#include "testutil/assert.h"
#include "testutil/parallel_test.h"

namespace starrocks::workgroup {

PARALLEL_TEST(PipelineExecutorSetConfigTest, test_constructor) {
CpuUtil::CpuIds empty_cpuids;
CpuUtil::CpuIds cpuids{0, 1, 2, 3, 4, 5, 6, 7};

/// check enable_cpu_borrowing
{
PipelineExecutorSetConfig config(2, 2, 2, 2, empty_cpuids, false, true);
ASSERT_FALSE(config.enable_cpu_borrowing);
}

{
PipelineExecutorSetConfig config(2, 2, 2, 2, empty_cpuids, true, false);
ASSERT_FALSE(config.enable_cpu_borrowing);
}

{
PipelineExecutorSetConfig config(2, 2, 2, 2, empty_cpuids, true, true);
ASSERT_TRUE(config.enable_cpu_borrowing);
}

/// check cpuids
{
PipelineExecutorSetConfig config(2, 2, 2, 2, empty_cpuids, false, true);
ASSERT_EQ(0, config.total_cpuids.size());
}

{
PipelineExecutorSetConfig config(0, 2, 2, 2, cpuids, false, true);
ASSERT_EQ(0, config.total_cpuids.size());
}

{
PipelineExecutorSetConfig config(2, 2, 2, 2, cpuids, false, true);
ASSERT_EQ(2, config.total_cpuids.size());
for (int i = 0; i < 2; i++) {
ASSERT_EQ(i, config.total_cpuids[i]);
}
}

{
PipelineExecutorSetConfig config(8, 2, 2, 2, cpuids, false, true);
ASSERT_EQ(8, config.total_cpuids.size());
for (int i = 0; i < 8; i++) {
ASSERT_EQ(i, config.total_cpuids[i]);
}
}

{
PipelineExecutorSetConfig config(100, 2, 2, 2, cpuids, false, true);
ASSERT_EQ(8, config.total_cpuids.size());
for (int i = 0; i < 8; i++) {
ASSERT_EQ(i, config.total_cpuids[i]);
}
}
}

} // namespace starrocks::workgroup

0 comments on commit e5153b7

Please sign in to comment.