From 6eb44b3358dd12f531babe206813bc471c87179b Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Fri, 13 Dec 2024 19:25:19 -0800 Subject: [PATCH] [Native] Make PrestoServer create and pass connectorCpuExecutor to Connector Create a CPUThreadPoolExecutor data member, connectorCpuExecutor_, for PrestoServer. Pass it to every created Connector. Add a new config `connector.num-cpu-threads-hw-multiplier` to control how many threads would be used for the executor. `connector.num-cpu-threads-hw-multiplier` will set connectorCpuExecutor_ to nullptr. Make a process-wise managed CPUThreadPoolExecutor instance available to all connectors. Connector could schedule CPU-bound async operators to it so that they will not occupy the driver thread pool. --- .../presto_cpp/main/PrestoServer.cpp | 26 ++++++++++++++++++- .../presto_cpp/main/PrestoServer.h | 3 +++ .../presto_cpp/main/common/Configs.cpp | 5 ++++ .../presto_cpp/main/common/Configs.h | 10 +++++++ presto-native-execution/velox | 2 +- 5 files changed, 44 insertions(+), 2 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index a01eb58bdd178..84b3b26fde0a6 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -665,6 +665,15 @@ void PrestoServer::run() { // HTTP IO executor threads are joined. driverExecutor_.reset(); + if (connectorCpuExecutor_) { + PRESTO_SHUTDOWN_LOG(INFO) + << "Joining Connector CPU Executor '" + << connectorCpuExecutor_->getName() + << "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/" + << connectorCpuExecutor_->numThreads(); + connectorCpuExecutor_->join(); + } + if (connectorIoExecutor_) { PRESTO_SHUTDOWN_LOG(INFO) << "Joining Connector IO Executor '" << connectorIoExecutor_->getName() @@ -1172,6 +1181,20 @@ std::vector PrestoServer::registerConnectors( const fs::path& configDirectoryPath) { static const std::string kPropertiesExtension = ".properties"; + const auto numConnectorCpuThreads = std::max( + SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() * + std::thread::hardware_concurrency(), + 0); + if (numConnectorCpuThreads > 0) { + connectorCpuExecutor_ = std::make_unique( + numConnectorCpuThreads, + std::make_shared("Connector")); + + PRESTO_STARTUP_LOG(INFO) + << "Connector CPU executor has " << connectorCpuExecutor_->numThreads() + << " threads."; + } + const auto numConnectorIoThreads = std::max( SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() * std::thread::hardware_concurrency(), @@ -1218,7 +1241,8 @@ std::vector PrestoServer::registerConnectors( ->newConnector( catalogName, std::move(properties), - connectorIoExecutor_.get()); + connectorIoExecutor_.get(), + connectorCpuExecutor_.get()); velox::connector::registerConnector(connector); } } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index d364befaa1b6f..615d6bfb910d9 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -235,6 +235,9 @@ class PrestoServer { // Executor for background writing into SSD cache. std::unique_ptr cacheExecutor_; + // Executor for async execution for connectors. + std::unique_ptr connectorCpuExecutor_; + // Executor for async IO for connectors. std::unique_ptr connectorIoExecutor_; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 0929f43436a88..a3227719eb9e3 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -155,6 +155,7 @@ SystemConfig::SystemConfig() { NONE_PROP(kHttpsClientCertAndKeyPath), NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0), + NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 0.0), NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0), BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false), @@ -375,6 +376,10 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const { .value(); } +double SystemConfig::connectorNumCpuThreadsHwMultiplier() const { + return optionalProperty(kConnectorNumCpuThreadsHwMultiplier).value(); +} + double SystemConfig::connectorNumIoThreadsHwMultiplier() const { return optionalProperty(kConnectorNumIoThreadsHwMultiplier).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 56f54e30f3d4e..fcd17794c1c69 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -213,6 +213,14 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kHttpsClientCertAndKeyPath{ "https-client-cert-key-path"}; + /// Floating point number used in calculating how many threads we would use + /// for CPU executor for connectors mainly for async operators: + /// hw_concurrency x multiplier. + /// If 0.0 then connector CPU executor would not be created. + /// 0.0 is default. + static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{ + "connector.num-cpu-threads-hw-multiplier"}; + /// Floating point number used in calculating how many threads we would use /// for IO executor for connectors mainly to do preload/prefetch: /// hw_concurrency x multiplier. @@ -724,6 +732,8 @@ class SystemConfig : public ConfigBase { double exchangeHttpClientNumCpuThreadsHwMultiplier() const; + double connectorNumCpuThreadsHwMultiplier() const; + double connectorNumIoThreadsHwMultiplier() const; double driverNumCpuThreadsHwMultiplier() const; diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 960c2af9be8a5..12942c1eb76de 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 960c2af9be8a5992dd4528f945fb63ca6dae2f92 +Subproject commit 12942c1eb76de019b775f4b207bc4595d8ace5c0