Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Native] Make PrestoServer create and pass connectorCpuExecutor to Connector #24259

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,15 @@ void PrestoServer::run() {
// HTTP IO executor threads are joined.
driverExecutor_.reset();

if (connectorCpuExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector CPU Executor '"
<< connectorCpuExecutor_->getName()
<< "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/"
<< connectorCpuExecutor_->numThreads();
connectorCpuExecutor_->join();
}

if (connectorIoExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector IO Executor '" << connectorIoExecutor_->getName()
Expand Down Expand Up @@ -1172,6 +1181,20 @@ std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";

const auto numConnectorCpuThreads = std::max<size_t>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to track the connector cpu executor stats in followup. Thanks!

Copy link
Contributor Author

@gggrace14 gggrace14 Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check with you about what stats will be useful to expose offline

SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
0);
if (numConnectorCpuThreads > 0) {
connectorCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numConnectorCpuThreads,
std::make_shared<folly::NamedThreadFactory>("Connector"));

PRESTO_STARTUP_LOG(INFO)
<< "Connector CPU executor has " << connectorCpuExecutor_->numThreads()
<< " threads.";
}

const auto numConnectorIoThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
Expand Down Expand Up @@ -1218,7 +1241,8 @@ std::vector<std::string> PrestoServer::registerConnectors(
->newConnector(
catalogName,
std::move(properties),
connectorIoExecutor_.get());
connectorIoExecutor_.get(),
connectorCpuExecutor_.get());
velox::connector::registerConnector(connector);
}
}
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class PrestoServer {
// Executor for background writing into SSD cache.
std::unique_ptr<folly::CPUThreadPoolExecutor> cacheExecutor_;

// Executor for async execution for connectors.
std::unique_ptr<folly::CPUThreadPoolExecutor> connectorCpuExecutor_;

// Executor for async IO for connectors.
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ SystemConfig::SystemConfig() {
NONE_PROP(kHttpsClientCertAndKeyPath),
NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0),
NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 0.0),
NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0),
BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false),
Expand Down Expand Up @@ -375,6 +376,10 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const {
.value();
}

double SystemConfig::connectorNumCpuThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumCpuThreadsHwMultiplier).value();
}

double SystemConfig::connectorNumIoThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumIoThreadsHwMultiplier).value();
}
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kHttpsClientCertAndKeyPath{
"https-client-cert-key-path"};

/// Floating point number used in calculating how many threads we would use
/// for CPU executor for connectors mainly for async operators:
/// hw_concurrency x multiplier.
/// If 0.0 then connector CPU executor would not be created.
/// 0.0 is default.
static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{
"connector.num-cpu-threads-hw-multiplier"};

/// Floating point number used in calculating how many threads we would use
/// for IO executor for connectors mainly to do preload/prefetch:
/// hw_concurrency x multiplier.
Expand Down Expand Up @@ -724,6 +732,8 @@ class SystemConfig : public ConfigBase {

double exchangeHttpClientNumCpuThreadsHwMultiplier() const;

double connectorNumCpuThreadsHwMultiplier() const;

double connectorNumIoThreadsHwMultiplier() const;

double driverNumCpuThreadsHwMultiplier() const;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 67 files
+2 −1 velox/connectors/Connector.h
+3 −2 velox/connectors/fuzzer/FuzzerConnector.h
+1 −1 velox/connectors/hive/HiveConfig.cpp
+3 −2 velox/connectors/hive/HiveConnector.h
+2 −1 velox/connectors/tests/ConnectorTest.cpp
+3 −2 velox/connectors/tpch/TpchConnector.h
+9 −0 velox/core/QueryConfig.h
+4 −0 velox/docs/configs.rst
+1 −0 velox/docs/develop/testing.rst
+14 −13 velox/docs/develop/testing/cache-fuzzer.rst
+195 −0 velox/docs/functions/presto/aggregate.rst
+5 −5 velox/docs/functions/presto/coverage.rst
+3 −3 velox/docs/functions/spark/regexp.rst
+19 −0 velox/docs/functions/spark/string.rst
+1 −1 velox/dwio/common/BufferedInput.h
+1 −1 velox/dwio/common/CacheInputStream.cpp
+6 −1 velox/dwio/common/CacheInputStream.h
+1 −1 velox/dwio/common/DirectInputStream.cpp
+1 −1 velox/dwio/common/DirectInputStream.h
+2 −2 velox/dwio/common/SeekableInputStream.cpp
+3 −3 velox/dwio/common/SeekableInputStream.h
+30 −7 velox/dwio/common/Statistics.h
+1 −1 velox/dwio/common/compression/PagedInputStream.h
+7 −4 velox/dwio/dwrf/reader/DwrfReader.cpp
+3 −1 velox/dwio/dwrf/reader/DwrfReader.h
+100 −41 velox/dwio/dwrf/reader/ReaderBase.cpp
+20 −11 velox/dwio/dwrf/reader/ReaderBase.h
+3 −3 velox/dwio/dwrf/test/ReaderTest.cpp
+1 −1 velox/dwio/dwrf/test/TestDecompression.cpp
+3 −1 velox/dwio/dwrf/test/WriterTest.cpp
+111 −39 velox/exec/fuzzer/CacheFuzzer.cpp
+2 −1 velox/exec/tests/AsyncConnectorTest.cpp
+6 −12 velox/exec/tests/PrintPlanWithStatsTest.cpp
+22 −5 velox/exec/tests/TableScanTest.cpp
+15 −3 velox/expression/fuzzer/ExpressionFuzzer.cpp
+43 −24 velox/functions/lib/Re2Functions.cpp
+22 −3 velox/functions/lib/Re2Functions.h
+33 −23 velox/functions/lib/tests/Re2FunctionsTest.cpp
+7 −2 velox/functions/prestosql/DateTimeFunctions.h
+6 −0 velox/functions/prestosql/DateTimeImpl.h
+5 −0 velox/functions/prestosql/aggregates/AggregateNames.h
+1 −0 velox/functions/prestosql/aggregates/CMakeLists.txt
+684 −0 velox/functions/prestosql/aggregates/ClassificationAggregation.cpp
+5 −0 velox/functions/prestosql/aggregates/RegisterAggregateFunctions.cpp
+1 −0 velox/functions/prestosql/aggregates/tests/CMakeLists.txt
+247 −0 velox/functions/prestosql/aggregates/tests/ClassificationAggregationTest.cpp
+11 −1 velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp
+85 −0 velox/functions/prestosql/fuzzer/ClassificationAggregationInputGenerator.h
+16 −1 velox/functions/prestosql/fuzzer/WindowFuzzerTest.cpp
+21 −0 velox/functions/prestosql/tests/DateTimeFunctionsTest.cpp
+24 −0 velox/functions/prestosql/tests/RegexpReplaceTest.cpp
+1 −0 velox/functions/sparksql/CMakeLists.txt
+390 −0 velox/functions/sparksql/ConcatWs.cpp
+35 −0 velox/functions/sparksql/ConcatWs.h
+24 −19 velox/functions/sparksql/RegexFunctions.cpp
+19 −0 velox/functions/sparksql/Split.h
+5 −0 velox/functions/sparksql/registration/RegisterString.cpp
+1 −0 velox/functions/sparksql/tests/CMakeLists.txt
+305 −0 velox/functions/sparksql/tests/ConcatWsTest.cpp
+4 −2 velox/functions/sparksql/tests/RegexFunctionsTest.cpp
+13 −129 velox/serializers/CompactRowSerializer.cpp
+190 −0 velox/serializers/RowSerializer.h
+5 −141 velox/serializers/UnsafeRowSerializer.cpp
+3 −6 velox/serializers/tests/UnsafeRowSerializerTest.cpp
+2 −2 velox/type/Timestamp.h
+18 −0 velox/type/tz/TimeZoneMap.cpp
+8 −0 velox/type/tz/TimeZoneMap.h
Loading