Skip to content

Commit

Permalink
Merge pull request ClickHouse#52068 from azat/replicated-cluster-prep1
Browse files Browse the repository at this point in the history
Preparations for Trivial Support For Resharding (part1)
  • Loading branch information
alexey-milovidov committed Jul 24, 2023
2 parents e05e0ec + 4a33e02 commit 3c034d5
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 74 deletions.
6 changes: 0 additions & 6 deletions src/Interpreters/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,6 @@ class Cluster
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);

Address(
const String & host_port_,
const ClusterConnectionParameters & params,
UInt32 shard_index_,
UInt32 replica_index_);

Address(
const DatabaseReplicaInfo & info,
const ClusterConnectionParameters & params,
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ void SelectStreamFactory::createForShard(
{
remote_shards.emplace_back(Shard{
.query = query_ast,
.main_table = main_table,
.header = header,
.shard_info = shard_info,
.lazy = lazy,
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/ClusterProxy/SelectStreamFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class SelectStreamFactory
{
/// Query and header may be changed depending on shard.
ASTPtr query;
/// Used to check the table existence on remote node
StorageID main_table;
Block header;

Cluster::ShardInfo shard_info;
Expand Down
19 changes: 10 additions & 9 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@ namespace ErrorCodes
namespace ClusterProxy
{

ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info, Poco::Logger * log)
ContextMutablePtr updateSettingsForCluster(bool interserver_mode,
ContextPtr context,
const Settings & settings,
const StorageID & main_table,
const SelectQueryInfo * query_info,
Poco::Logger * log)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);

/// If "secret" (in remote_servers) is not in use,
/// user on the shard is not the same as the user on the initiator,
/// hence per-user limits should not be applied.
if (cluster.getSecret().empty())
if (!interserver_mode)
{
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
Expand Down Expand Up @@ -170,17 +175,15 @@ void executeQuery(
std::vector<QueryPlanPtr> plans;
SelectStreamFactory::Shards remote_shards;

auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, main_table, &query_info, log);
auto new_context = updateSettingsForCluster(!query_info.getCluster()->getSecret().empty(), context, settings, main_table, &query_info, log);
new_context->increaseDistributedDepth();

size_t shards = query_info.getCluster()->getShardCount();
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())
{
ASTPtr query_ast_for_shard;
if (query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
ASTPtr query_ast_for_shard = query_ast->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
query_ast_for_shard = query_ast->clone();

OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
Expand All @@ -191,8 +194,6 @@ void executeQuery(
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast->clone();

if (shard_filter_generator)
{
Expand Down
8 changes: 6 additions & 2 deletions src/Interpreters/ClusterProxy/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ class SelectStreamFactory;
/// - optimize_skip_unused_shards_nesting
///
/// @return new Context with adjusted settings
ContextMutablePtr updateSettingsForCluster(
const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info = nullptr, Poco::Logger * log = nullptr);
ContextMutablePtr updateSettingsForCluster(bool interserver_mode,
ContextPtr context,
const Settings & settings,
const StorageID & main_table,
const SelectQueryInfo * query_info = nullptr,
Poco::Logger * log = nullptr);

using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
Expand Down
3 changes: 1 addition & 2 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2274,8 +2274,7 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 max_paralle
&& !settings.allow_experimental_query_deduplication
&& !settings.empty_result_for_aggregation_by_empty_set
&& storage
&& storage->getName() != "MaterializedMySQL"
&& !storage->hasLightweightDeletedMask()
&& storage->supportsTrivialCountOptimization()
&& query_info.filter_asts.empty()
&& query_analyzer->hasAggregation()
&& (query_analyzer->aggregates().size() == 1)
Expand Down
3 changes: 3 additions & 0 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ bool applyTrivialCountIfPossible(
return false;

const auto & storage = table_node.getStorage();
if (!storage->supportsTrivialCountOptimization())
return false;

auto storage_id = storage->getStorageID();
auto row_policy_filter = query_context->getRowPolicyFilter(storage_id.getDatabaseName(),
storage_id.getTableName(),
Expand Down
6 changes: 4 additions & 2 deletions src/Processors/QueryPlan/ReadFromRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
if (my_table_func_ptr)
try_results = my_shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else
try_results = my_shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, my_main_table.getQualifiedName());
try_results = my_shard.shard_info.pool->getManyChecked(
timeouts, &current_settings, PoolMode::GET_MANY,
my_shard.main_table ? my_shard.main_table.getQualifiedName() : my_main_table.getQualifiedName());
}
catch (const Exception & ex)
{
Expand Down Expand Up @@ -241,7 +243,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
remote_query_executor->setPoolMode(PoolMode::GET_MANY);

if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);

pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
Expand Down
1 change: 1 addition & 0 deletions src/Processors/QueryPlan/ReadFromRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ using ThrottlerPtr = std::shared_ptr<Throttler>;
class ReadFromRemote final : public ISourceStep
{
public:
/// @param main_table_ if Shards contains main_table then this parameter will be ignored
ReadFromRemote(
ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_,
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// because those are internally translated into 'ALTER UDPATE' mutations.
virtual bool supportsDelete() const { return false; }

/// Return true if the trivial count query could be optimized without reading the data at all.
virtual bool supportsTrivialCountOptimization() const { return false; }

private:

StorageID storage_id;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ class MergeTreeData : public IStorage, public WithMutableContext

bool areAsynchronousInsertsEnabled() const override { return getSettings()->async_insert; }

bool supportsTrivialCountOptimization() const override { return !hasLightweightDeletedMask(); }

NamesAndTypesList getVirtuals() const override;

bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const override;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageMaterializedMySQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class StorageMaterializedMySQL final : public StorageProxy

void drop() override { nested_storage->drop(); }

bool supportsTrivialCountOptimization() const override { return false; }

private:
[[noreturn]] static void throwNotAllowed()
{
Expand Down
133 changes: 84 additions & 49 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4902,67 +4902,102 @@ void StorageReplicatedMergeTree::read(
snapshot_data.alter_conversions = {};
});

/** The `select_sequential_consistency` setting has two meanings:
* 1. To throw an exception if on a replica there are not all parts which have been written down on quorum of remaining replicas.
* 2. Do not read parts that have not yet been written to the quorum of the replicas.
* For this you have to synchronously go to ZooKeeper.
*/
if (local_context->getSettingsRef().select_sequential_consistency)
{
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
if (auto plan = reader.read(
column_names, storage_snapshot, query_info, local_context,
max_block_size, num_streams, processed_stage, std::move(max_added_blocks), /*enable_parallel_reading*/false))
query_plan = std::move(*plan);
return;
}
const auto & settings = local_context->getSettingsRef();

if (local_context->canUseParallelReplicasOnInitiator())
{
auto table_id = getStorageID();
/// The `select_sequential_consistency` setting has two meanings:
/// 1. To throw an exception if on a replica there are not all parts which have been written down on quorum of remaining replicas.
/// 2. Do not read parts that have not yet been written to the quorum of the replicas.
/// For this you have to synchronously go to ZooKeeper.
if (settings.select_sequential_consistency)
return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);

ASTPtr modified_query_ast;
if (local_context->canUseParallelReplicasOnInitiator())
return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);

Block header;
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}

if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
auto plan = reader.read(column_names, storage_snapshot, query_info, local_context,
max_block_size, num_streams, processed_stage, std::move(max_added_blocks),
/* enable_parallel_reading= */false);
if (plan)
query_plan = std::move(*plan);
}

header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
{
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
void StorageReplicatedMergeTree::readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & /*column_names*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
auto table_id = getStorageID();

auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
auto parallel_replicas_cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);

ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);

ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(), /*remove_table_function_ptr*/ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info, cluster);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
{
if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, /*enable_parallel_reading*/local_context->canUseParallelReplicasOnFollower()))
query_plan = std::move(*plan);
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}

ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);

ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(),
/* table_func_ptr= */ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info, parallel_replicas_cluster);
}

void StorageReplicatedMergeTree::readLocalImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const size_t num_streams)
{
auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage,
/* max_block_numbers_to_read= */ nullptr,
/* enable_parallel_reading= */ local_context->canUseParallelReplicasOnFollower());
if (plan)
query_plan = std::move(*plan);
}

template <class Func>
Expand Down
32 changes: 31 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
Expand Down Expand Up @@ -513,6 +513,36 @@ class StorageReplicatedMergeTree final : public MergeTreeData

static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);

void readLocalImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);

void readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);

void readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);

template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

Expand Down
4 changes: 2 additions & 2 deletions src/Storages/getStructureOfRemoteTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
}

ColumnsDescription res;
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef(), table_id);
auto new_context = ClusterProxy::updateSettingsForCluster(!cluster.getSecret().empty(), context, context->getSettingsRef(), table_id);

/// Ignore limit for result number of rows (that could be set during handling CSE/CTE),
/// since this is a service query and should not lead to query failure.
Expand Down Expand Up @@ -176,7 +176,7 @@ ColumnsDescriptionByShardNum getExtendedObjectsOfRemoteTables(
const auto & shards_info = cluster.getShardsInfo();
auto query = "DESC TABLE " + remote_table_id.getFullTableName();

auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef(), remote_table_id);
auto new_context = ClusterProxy::updateSettingsForCluster(!cluster.getSecret().empty(), context, context->getSettingsRef(), remote_table_id);
new_context->setSetting("describe_extend_object_types", true);

/// Expect only needed columns from the result of DESC TABLE.
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/TableFunctionRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
secure,
/* priority= */ Priority{1},
/* cluster_name= */ "",
/* password= */ ""
/* cluster_secret= */ ""
};
cluster = std::make_shared<Cluster>(context->getSettingsRef(), names, params);
}
Expand Down

0 comments on commit 3c034d5

Please sign in to comment.