Skip to content

Commit

Permalink
[Native] Change to create and pass CPUThreadPoolExecutor to Connector…
Browse files Browse the repository at this point in the history
… interface
  • Loading branch information
gggrace14 committed Dec 13, 2024
1 parent 3ada782 commit 8cba138
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
24 changes: 12 additions & 12 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,12 @@ void PrestoServer::run() {
// HTTP IO executor threads are joined.
driverExecutor_.reset();

if (connectorIoExecutor_) {
if (connectorCpuExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector IO Executor '" << connectorIoExecutor_->getName()
<< "': threads: " << connectorIoExecutor_->numActiveThreads() << "/"
<< connectorIoExecutor_->numThreads();
connectorIoExecutor_->join();
<< "Joining Connector CPU Executor '" << connectorCpuExecutor_->getName()
<< "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/"
<< connectorCpuExecutor_->numThreads();
connectorCpuExecutor_->join();
}

if (httpSrvCpuExecutor_ != nullptr) {
Expand Down Expand Up @@ -1172,17 +1172,17 @@ std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";

const auto numConnectorIoThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() *
const auto numConnectorCpuThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
0);
if (numConnectorIoThreads > 0) {
connectorIoExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(
numConnectorIoThreads,
if (numConnectorCpuThreads > 0) {
connectorCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numConnectorCpuThreads,
std::make_shared<folly::NamedThreadFactory>("Connector"));

PRESTO_STARTUP_LOG(INFO)
<< "Connector IO executor has " << connectorIoExecutor_->numThreads()
<< "Connector CPU executor has " << connectorCpuExecutor_->numThreads()
<< " threads.";
}

Expand Down Expand Up @@ -1218,7 +1218,7 @@ std::vector<std::string> PrestoServer::registerConnectors(
->newConnector(
catalogName,
std::move(properties),
connectorIoExecutor_.get());
connectorCpuExecutor_.get());
velox::connector::registerConnector(connector);
}
}
Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class PrestoServer {
// Executor for background writing into SSD cache.
std::unique_ptr<folly::CPUThreadPoolExecutor> cacheExecutor_;

// Executor for async IO for connectors.
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;
// Executor for async execution for connectors.
std::unique_ptr<folly::CPUThreadPoolExecutor> connectorCpuExecutor_;

// Executor for exchange data over http.
std::shared_ptr<folly::IOThreadPoolExecutor> exchangeHttpIoExecutor_;
Expand Down
6 changes: 3 additions & 3 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ SystemConfig::SystemConfig() {
NONE_PROP(kHttpsClientCertAndKeyPath),
NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0),
NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 0.0),
NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0),
BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false),
NUM_PROP(kDriverStuckOperatorThresholdMs, 30 * 60 * 1000),
Expand Down Expand Up @@ -375,8 +375,8 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const {
.value();
}

double SystemConfig::connectorNumIoThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumIoThreadsHwMultiplier).value();
double SystemConfig::connectorNumCpuThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumCpuThreadsHwMultiplier).value();
}

double SystemConfig::driverNumCpuThreadsHwMultiplier() const {
Expand Down
8 changes: 4 additions & 4 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ class SystemConfig : public ConfigBase {
"https-client-cert-key-path"};

/// Floating point number used in calculating how many threads we would use
/// for IO executor for connectors mainly to do preload/prefetch:
/// for CPU executor for connectors mainly to do preload/prefetch:
/// hw_concurrency x multiplier.
/// If 0.0 then connector preload/prefetch is disabled.
/// 0.0 is default.
static constexpr std::string_view kConnectorNumIoThreadsHwMultiplier{
"connector.num-io-threads-hw-multiplier"};
static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{
"connector.num-cpu-threads-hw-multiplier"};

/// Floating point number used in calculating how many threads we would use
/// for Driver CPU executor: hw_concurrency x multiplier. 4.0 is default.
Expand Down Expand Up @@ -724,7 +724,7 @@ class SystemConfig : public ConfigBase {

double exchangeHttpClientNumCpuThreadsHwMultiplier() const;

double connectorNumIoThreadsHwMultiplier() const;
double connectorNumCpuThreadsHwMultiplier() const;

double driverNumCpuThreadsHwMultiplier() const;

Expand Down

0 comments on commit 8cba138

Please sign in to comment.