From 43ef2d75bfe9517808f3f2ca2dd0648faf0136d0 Mon Sep 17 00:00:00 2001 From: Parag Jain <40451840+paragikjain@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:38:44 +0530 Subject: [PATCH] Support MERGE command for single_shard_distributed Target (#7643) This PR has following changes : 1. Enable MERGE command for single_shard_distributed targets. (cherry picked from commit 3c467e6e02f630643bf3120210c0462bd51af2dd) --- src/backend/distributed/commands/multi_copy.c | 2 +- .../distributed/planner/merge_planner.c | 42 +- src/test/regress/bin/normalize.sed | 1 + .../expected/merge_schema_sharding.out | 53 +- src/test/regress/expected/merge_vcore.out | 481 ++++++++++++++++++ src/test/regress/expected/merge_vcore_0.out | 6 + src/test/regress/multi_schedule | 1 + src/test/regress/sql/merge_vcore.sql | 319 ++++++++++++ 8 files changed, 879 insertions(+), 26 deletions(-) create mode 100644 src/test/regress/expected/merge_vcore.out create mode 100644 src/test/regress/expected/merge_vcore_0.out create mode 100644 src/test/regress/sql/merge_vcore.sql diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index aaff24a10f5..1517c1d5adb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2572,7 +2572,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu * Find the shard interval and id for the partition column value for * non-reference tables. * - * For reference table, this function blindly returns the tables single + * For reference table, and single shard distributed table this function blindly returns the tables single * shard. */ ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry); diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 09d2d90acec..1f9d17c43fe 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -243,14 +243,27 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); - /* - * Get the index of the column in the source query that will be utilized - * to repartition the source rows, ensuring colocation with the target - */ - distributedPlan->sourceResultRepartitionColumnIndex = - SourceResultPartitionColumnIndex(mergeQuery, - sourceQuery->targetList, - targetRelation); + + if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) + { + /* + * if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1 + * so later in execution phase we don't rely on this value and try to find single shard of target instead. + */ + distributedPlan->sourceResultRepartitionColumnIndex = -1; + } + else + { + /* + * Get the index of the column in the source query that will be utilized + * to repartition the source rows, ensuring colocation with the target + */ + + distributedPlan->sourceResultRepartitionColumnIndex = + SourceResultPartitionColumnIndex(mergeQuery, + sourceQuery->targetList, + targetRelation); + } /* * Make a copy of the source query, since following code scribbles it @@ -262,11 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ int cursorOptions = CURSOR_OPT_PARALLEL_OK; PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions, boundParams); - bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); + bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); /* If plan is distributed, no work at the coordinator */ - if (repartitioned) + if (isRepartitionAllowed) { distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION; } @@ -1273,13 +1286,6 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation) { - if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) - { - ereport(ERROR, (errmsg("MERGE operation across distributed schemas " - "or with a row-based distributed table is " - "not yet supported"))); - } - /* Get all the Join conditions from the ON clause */ List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree); Var *targetColumn = targetRelation->partitionColumn; diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 5dc5d7ca88e..62e9de6971a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -32,6 +32,7 @@ s/"t2_[0-9]+"/"t2_xxxxxxx"/g # shard table names for MERGE tests s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g +s/merge_vcore_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g # shard table names for multi_subquery s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g diff --git a/src/test/regress/expected/merge_schema_sharding.out b/src/test/regress/expected/merge_schema_sharding.out index 8a9ba89dd46..17f6f6adb6e 100644 --- a/src/test/regress/expected/merge_schema_sharding.out +++ b/src/test/regress/expected/merge_schema_sharding.out @@ -98,14 +98,26 @@ WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b) +DEBUG: Execute MERGE task list -- with a distributed table SET search_path TO schema_shard_table1; MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) @@ -114,7 +126,12 @@ WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, sch DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b) +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); @@ -163,7 +180,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b; DEBUG: A mix of distributed and reference table, try repartitioning DEBUG: A mix of distributed and reference table, routable query is not possible DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); @@ -174,7 +197,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b; DEBUG: A mix of distributed and local table, try repartitioning DEBUG: A mix of distributed and citus-local table, routable query is not possible DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) WHEN MATCHED THEN DELETE; DEBUG: A mix of distributed and local table, try repartitioning @@ -210,7 +239,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b +DEBUG: Execute MERGE task list WITH cte AS materialized ( SELECT * FROM schema_shard_table.distributed_table ) @@ -219,7 +253,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b +DEBUG: Execute MERGE task list SET client_min_messages TO WARNING; DROP SCHEMA schema_shard_table1 CASCADE; DROP SCHEMA schema_shard_table2 CASCADE; diff --git a/src/test/regress/expected/merge_vcore.out b/src/test/regress/expected/merge_vcore.out new file mode 100644 index 00000000000..03f6f8820e4 --- /dev/null +++ b/src/test/regress/expected/merge_vcore.out @@ -0,0 +1,481 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; +NOTICE: schema "merge_vcore_schema" does not exist, skipping +--MERGE INTO target +--USING source +--WHEN NOT MATCHED +--WHEN MATCHED AND +--WHEN MATCHED +CREATE SCHEMA merge_vcore_schema; +SET search_path TO merge_vcore_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 4000000; +SET citus.explain_all_tasks TO true; +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET client_min_messages = warning; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +-- ****************************************** CASE 1 : Both are singleSharded*************************************** +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000000 source +(8 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 2 : source is single sharded and target is distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000002 source +(8 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 3 : source is distributed and target is single sharded ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000007 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000008 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000009 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000010 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 4 : both are distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000012 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000013 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000014 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000015 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 5 : both are distributed & colocated ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id', colocate_with=>'source'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000020 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000021 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000022 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000023 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 6 : both are singlesharded & colocated ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null, colocate_with=>'source'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4000029 target + -> Nested Loop + -> Seq Scan on source_4000028 source + -> Materialize + -> Seq Scan on target_4000029 target + Filter: ('2'::bigint = id) +(11 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; diff --git a/src/test/regress/expected/merge_vcore_0.out b/src/test/regress/expected/merge_vcore_0.out new file mode 100644 index 00000000000..a7e3fbf2062 --- /dev/null +++ b/src/test/regress/expected/merge_vcore_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a564699931e..d32757e4860 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -117,6 +117,7 @@ test: merge pgmerge test: merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables +test: merge_vcore # --------- # test that no tests leaked intermediate results. This should always be last diff --git a/src/test/regress/sql/merge_vcore.sql b/src/test/regress/sql/merge_vcore.sql new file mode 100644 index 00000000000..472bbfe914f --- /dev/null +++ b/src/test/regress/sql/merge_vcore.sql @@ -0,0 +1,319 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; +--MERGE INTO target +--USING source +--WHEN NOT MATCHED +--WHEN MATCHED AND +--WHEN MATCHED + +CREATE SCHEMA merge_vcore_schema; +SET search_path TO merge_vcore_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 4000000; +SET citus.explain_all_tasks TO true; +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET client_min_messages = warning; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + + +-- ****************************************** CASE 1 : Both are singleSharded*************************************** +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null, colocate_with=>'none'); +SELECT create_distributed_table('target', null, colocate_with=>'none'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 2 : source is single sharded and target is distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null, colocate_with=>'none'); +SELECT create_distributed_table('target', 'id'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + + + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 3 : source is distributed and target is single sharded ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', null); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 4 : both are distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', 'id'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 5 : both are distributed & colocated ******************************* + +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', 'id', colocate_with=>'source'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 6 : both are singlesharded & colocated ******************************* + +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null); +SELECT create_distributed_table('target', null, colocate_with=>'source'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; + + + +