Skip to content

Commit

Permalink
add setting table_function_max_readers
Browse files Browse the repository at this point in the history
  • Loading branch information
xiedeyantu committed Jul 23, 2023
1 parent 2e67a89 commit 15119b5
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class IColumn;
M(UInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(MaxThreads, table_function_max_readers, 0, "The maximum number of reader threads for s3/url/hdfsCluster to execute the request. By default, it is determined automatically.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/HDFS/StorageHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,8 @@ Pipe StorageHDFS::read(

Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
const size_t table_function_max_readers = context_->getSettingsRef().table_function_max_readers;
num_streams = std::min(num_streams, table_function_max_readers);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<HDFSSource>(
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,8 @@ Pipe StorageS3::read(
}

const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
const size_t table_function_max_readers = local_context->getSettingsRef().table_function_max_readers;
num_streams = std::min(num_streams, table_function_max_readers);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageURL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ Pipe IStorageURLBase::read(
pipes.reserve(num_streams);

size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
const size_t table_function_max_readers = local_context->getSettingsRef().table_function_max_readers;
num_streams = std::min(num_streams, table_function_max_readers);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
Expand Down

0 comments on commit 15119b5

Please sign in to comment.