From 516222d365f09f6e90172d09dcaeb32617f1f27b Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Fri, 13 Dec 2024 19:25:19 -0800 Subject: [PATCH] [Native] Make PrestoServer create and pass connectorCpuExecutor to Connector --- .../presto_cpp/main/PrestoServer.cpp | 25 ++++++++++++++++++- .../presto_cpp/main/PrestoServer.h | 3 +++ .../presto_cpp/main/common/Configs.cpp | 5 ++++ .../presto_cpp/main/common/Configs.h | 10 ++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index a01eb58bdd178..e6f008a935ee1 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -665,6 +665,14 @@ void PrestoServer::run() { // HTTP IO executor threads are joined. driverExecutor_.reset(); + if (connectorCpuExecutor_) { + PRESTO_SHUTDOWN_LOG(INFO) + << "Joining Connector CPU Executor '" << connectorCpuExecutor_->getName() + << "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/" + << connectorCpuExecutor_->numThreads(); + connectorCpuExecutor_->join(); + } + if (connectorIoExecutor_) { PRESTO_SHUTDOWN_LOG(INFO) << "Joining Connector IO Executor '" << connectorIoExecutor_->getName() @@ -1172,6 +1180,20 @@ std::vector PrestoServer::registerConnectors( const fs::path& configDirectoryPath) { static const std::string kPropertiesExtension = ".properties"; + const auto numConnectorCpuThreads = std::max( + SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() * + std::thread::hardware_concurrency(), + 0); + if (numConnectorCpuThreads > 0) { + connectorCpuExecutor_ = std::make_unique( + numConnectorCpuThreads, + std::make_shared("Connector")); + + PRESTO_STARTUP_LOG(INFO) + << "Connector CPU executor has " << connectorCpuExecutor_->numThreads() + << " threads."; + } + const auto numConnectorIoThreads = std::max( SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() * std::thread::hardware_concurrency(), @@ -1218,7 +1240,8 @@ std::vector PrestoServer::registerConnectors( ->newConnector( catalogName, std::move(properties), - connectorIoExecutor_.get()); + connectorIoExecutor_.get(), + connectorCpuExecutor_.get()); velox::connector::registerConnector(connector); } } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index d364befaa1b6f..615d6bfb910d9 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -235,6 +235,9 @@ class PrestoServer { // Executor for background writing into SSD cache. std::unique_ptr cacheExecutor_; + // Executor for async execution for connectors. + std::unique_ptr connectorCpuExecutor_; + // Executor for async IO for connectors. std::unique_ptr connectorIoExecutor_; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 0929f43436a88..7d7b3abe4db12 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -155,6 +155,7 @@ SystemConfig::SystemConfig() { NONE_PROP(kHttpsClientCertAndKeyPath), NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0), + NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 1.0), NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0), BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false), @@ -375,6 +376,10 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const { .value(); } +double SystemConfig::connectorNumCpuThreadsHwMultiplier() const { + return optionalProperty(kConnectorNumCpuThreadsHwMultiplier).value(); +} + double SystemConfig::connectorNumIoThreadsHwMultiplier() const { return optionalProperty(kConnectorNumIoThreadsHwMultiplier).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 56f54e30f3d4e..be524c2ba5b1c 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -213,6 +213,14 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kHttpsClientCertAndKeyPath{ "https-client-cert-key-path"}; + /// Floating point number used in calculating how many threads we would use + /// for CPU executor for connectors mainly for async operators: + /// hw_concurrency x multiplier. + /// If 0.0 then connector CPU execute would not be created. + /// 1.0 is default. + static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{ + "connector.num-cpu-threads-hw-multiplier"}; + /// Floating point number used in calculating how many threads we would use /// for IO executor for connectors mainly to do preload/prefetch: /// hw_concurrency x multiplier. @@ -724,6 +732,8 @@ class SystemConfig : public ConfigBase { double exchangeHttpClientNumCpuThreadsHwMultiplier() const; + double connectorNumCpuThreadsHwMultiplier() const; + double connectorNumIoThreadsHwMultiplier() const; double driverNumCpuThreadsHwMultiplier() const;