Skip to content

Commit

Permalink
Support MERGE command for single_shard_distributed Target (#7643)
Browse files Browse the repository at this point in the history
This PR has following changes :
1. Enable MERGE command for single_shard_distributed targets.

(cherry picked from commit 3c467e6)
  • Loading branch information
paragikjain authored and hanefi committed Jul 17, 2024
1 parent 9523584 commit a5ef010
Show file tree
Hide file tree
Showing 8 changed files with 879 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2572,7 +2572,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
* Find the shard interval and id for the partition column value for
* non-reference tables.
*
* For reference table, this function blindly returns the tables single
* For reference table, and single shard distributed table this function blindly returns the tables single
* shard.
*/
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry);
Expand Down
42 changes: 24 additions & 18 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,27 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ

CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);

/*
* Get the index of the column in the source query that will be utilized
* to repartition the source rows, ensuring colocation with the target
*/
distributedPlan->sourceResultRepartitionColumnIndex =
SourceResultPartitionColumnIndex(mergeQuery,
sourceQuery->targetList,
targetRelation);

if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{
/*
* if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1
* so later in execution phase we don't rely on this value and try to find single shard of target instead.
*/
distributedPlan->sourceResultRepartitionColumnIndex = -1;
}
else
{
/*
* Get the index of the column in the source query that will be utilized
* to repartition the source rows, ensuring colocation with the target
*/

distributedPlan->sourceResultRepartitionColumnIndex =
SourceResultPartitionColumnIndex(mergeQuery,
sourceQuery->targetList,
targetRelation);
}

/*
* Make a copy of the source query, since following code scribbles it
Expand All @@ -262,11 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
boundParams);
bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);

/* If plan is distributed, no work at the coordinator */
if (repartitioned)
if (isRepartitionAllowed)
{
distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION;
}
Expand Down Expand Up @@ -1273,13 +1286,6 @@ static int
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
CitusTableCacheEntry *targetRelation)
{
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
"or with a row-based distributed table is "
"not yet supported")));
}

/* Get all the Join conditions from the ON clause */
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree);
Var *targetColumn = targetRelation->partitionColumn;
Expand Down
1 change: 1 addition & 0 deletions src/test/regress/bin/normalize.sed
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ s/"t2_[0-9]+"/"t2_xxxxxxx"/g
# shard table names for MERGE tests
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
s/merge_vcore_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g

# shard table names for multi_subquery
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g
Expand Down
53 changes: 46 additions & 7 deletions src/test/regress/expected/merge_schema_sharding.out
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,26 @@ WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)
DEBUG: Execute MERGE task list
-- with a distributed table
SET search_path TO schema_shard_table1;
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
Expand All @@ -114,7 +126,12 @@ WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, sch
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
Expand Down Expand Up @@ -163,7 +180,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b;
DEBUG: A mix of distributed and reference table, try repartitioning
DEBUG: A mix of distributed and reference table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
Expand All @@ -174,7 +197,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b;
DEBUG: A mix of distributed and local table, try repartitioning
DEBUG: A mix of distributed and citus-local table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
WHEN MATCHED THEN DELETE;
DEBUG: A mix of distributed and local table, try repartitioning
Expand Down Expand Up @@ -210,7 +239,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
DEBUG: Execute MERGE task list
WITH cte AS materialized (
SELECT * FROM schema_shard_table.distributed_table
)
Expand All @@ -219,7 +253,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
DEBUG: Execute MERGE task list
SET client_min_messages TO WARNING;
DROP SCHEMA schema_shard_table1 CASCADE;
DROP SCHEMA schema_shard_table2 CASCADE;
Expand Down
Loading

0 comments on commit a5ef010

Please sign in to comment.