Skip to content

Commit

Permalink
[Native] Make PrestoServer create and pass connectorCpuExecutor to Co…
Browse files Browse the repository at this point in the history
…nnector
  • Loading branch information
gggrace14 committed Dec 14, 2024
1 parent 17ac4c3 commit 024fa28
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 1 deletion.
26 changes: 25 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,15 @@ void PrestoServer::run() {
// HTTP IO executor threads are joined.
driverExecutor_.reset();

if (connectorCpuExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector CPU Executor '"
<< connectorCpuExecutor_->getName()
<< "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/"
<< connectorCpuExecutor_->numThreads();
connectorCpuExecutor_->join();
}

if (connectorIoExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector IO Executor '" << connectorIoExecutor_->getName()
Expand Down Expand Up @@ -1172,6 +1181,20 @@ std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";

const auto numConnectorCpuThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
0);
if (numConnectorCpuThreads > 0) {
connectorCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numConnectorCpuThreads,
std::make_shared<folly::NamedThreadFactory>("Connector"));

PRESTO_STARTUP_LOG(INFO)
<< "Connector CPU executor has " << connectorCpuExecutor_->numThreads()
<< " threads.";
}

const auto numConnectorIoThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
Expand Down Expand Up @@ -1218,7 +1241,8 @@ std::vector<std::string> PrestoServer::registerConnectors(
->newConnector(
catalogName,
std::move(properties),
connectorIoExecutor_.get());
connectorIoExecutor_.get(),
connectorCpuExecutor_.get());
velox::connector::registerConnector(connector);
}
}
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class PrestoServer {
// Executor for background writing into SSD cache.
std::unique_ptr<folly::CPUThreadPoolExecutor> cacheExecutor_;

// Executor for async execution for connectors.
std::unique_ptr<folly::CPUThreadPoolExecutor> connectorCpuExecutor_;

// Executor for async IO for connectors.
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ SystemConfig::SystemConfig() {
NONE_PROP(kHttpsClientCertAndKeyPath),
NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0),
NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 0.0),
NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0),
BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false),
Expand Down Expand Up @@ -375,6 +376,10 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const {
.value();
}

double SystemConfig::connectorNumCpuThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumCpuThreadsHwMultiplier).value();
}

double SystemConfig::connectorNumIoThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumIoThreadsHwMultiplier).value();
}
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kHttpsClientCertAndKeyPath{
"https-client-cert-key-path"};

/// Floating point number used in calculating how many threads we would use
/// for CPU executor for connectors mainly for async operators:
/// hw_concurrency x multiplier.
/// If 0.0 then connector CPU executor would not be created.
/// 0.0 is default.
static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{
"connector.num-cpu-threads-hw-multiplier"};

/// Floating point number used in calculating how many threads we would use
/// for IO executor for connectors mainly to do preload/prefetch:
/// hw_concurrency x multiplier.
Expand Down Expand Up @@ -724,6 +732,8 @@ class SystemConfig : public ConfigBase {

double exchangeHttpClientNumCpuThreadsHwMultiplier() const;

double connectorNumCpuThreadsHwMultiplier() const;

double connectorNumIoThreadsHwMultiplier() const;

double driverNumCpuThreadsHwMultiplier() const;
Expand Down

0 comments on commit 024fa28

Please sign in to comment.