From 4f0053ed6d1453fb77286211f38e959f8c9ed4f4 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 17 Jun 2024 16:07:25 +0200 Subject: [PATCH] Redo #7620: Fix merge command when insert value does not have source distributed column (#7627) Related to issue #7619, #7620 Merge command fails when source query is single sharded and source and target are co-located and insert is not using distribution key of source. Example ``` CREATE TABLE source (id integer); CREATE TABLE target (id integer ); -- let's distribute both table on id field SELECT create_distributed_table('source', 'id'); SELECT create_distributed_table('target', 'id'); MERGE INTO target t USING ( SELECT 1 AS somekey FROM source WHERE source.id = 1) s ON t.id = s.somekey WHEN NOT MATCHED THEN INSERT (id) VALUES (s.somekey) ERROR: MERGE INSERT must use the source table distribution column value HINT: MERGE INSERT must use the source table distribution column value ``` Author's Opinion: If join is not between source and target distributed column, we should not force user to use source distributed column while inserting value of target distributed column. Fix: If user is not using distributed key of source for insertion let's not push down query to workers and don't force user to use source distributed column if it is not part of join. This reverts commit fa4fc0b372e4068e069946e3fdf454137736bcc7. Co-authored-by: paragjain (cherry picked from commit aaaf637a6babebc9d9fa181e3a94b68825e2816f) --- .../distributed/planner/merge_planner.c | 29 +- src/test/regress/expected/merge.out | 337 ++++++++++++++++-- src/test/regress/multi_schedule | 3 +- src/test/regress/sql/merge.sql | 133 +++++++ 4 files changed, 470 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 4d64b8f5625..09d2d90acec 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) - { - ereport(ERROR, (errmsg("MERGE INSERT must use the source table " - "distribution column value"))); - } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1124,6 +1116,27 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } + + + /* + * If execution has reached this point, it indicates that the query can be delegated to the worker. + * However, before proceeding with this delegation, we need to confirm that the user is utilizing + * the distribution column of the source table in the Insert variable. + * If this is not the case, we should refrain from pushing down the query. + * This is just a deffered error which will be handle by caller. + */ + + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, query); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) + { + ereport(DEBUG1, (errmsg( + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied", + NULL, NULL); + } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index a73467e81a8..5056ba5432e 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,6 +1842,297 @@ SELECT compare_tables(); (1 row) ROLLBACK; +-- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + colocation_status +--------------------------------------------------------------------- + Same +(1 row) + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000064 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000068 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000069 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000065 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000069 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000070 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000066 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000070 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000071 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000067 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000071 t +(47 rows) + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +-- DEBUG LOGS show that query is getting pushed down +MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: Creating MERGE router plan +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target_pushdowntest method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest + Filter: (id = 1) +(9 rows) + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); +SELECT create_distributed_table('source_withdata', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + worker_hash +--------------------------------------------------------------------- + -28094569 +(1 row) + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id, name) + VALUES (s.some_number, 'parag'); +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 4 + Tuple data received from nodes: 9 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1) + -> Task + Tuple data received from node: 9 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1) +(20 rows) + +-- let's verify target data too. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag +(1 row) + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data updated properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; +ERROR: updating the distribution column is not allowed in MERGE actions +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data deleted properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- +(0 rows) + +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); +ERROR: MERGE INSERT is using unsupported expression type for distribution column +DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards +SELECT * FROM target_table WHERE id = 10000; + id | name +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -2898,14 +3189,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3119,10 +3410,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3240,7 +3531,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source table distribution column value +ERROR: MERGE INSERT must use the source's joining column for target's distribution column MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -3966,7 +4257,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 103 other objects +NOTICE: drop cascades to 107 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4026,6 +4317,10 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() +drop cascades to table source_pushdowntest +drop cascades to table target_pushdowntest +drop cascades to table source_withdata +drop cascades to table target_table drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4042,7 +4337,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000112 +drop cascades to table refsource_ref_4000128 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4060,11 +4355,7 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000174 -drop cascades to table s1_4000175 +drop cascades to table t1_4000190 +drop cascades to table s1_4000191 drop cascades to table t1 -drop cascades to table s1 -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables -and 3 other objects (see server log for list) +and 7 other objects (see server log for list) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 54d65686c23..a564699931e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -113,7 +113,8 @@ test: function_with_case_when test: clock # MERGE tests -test: merge pgmerge merge_repartition2 +test: merge pgmerge +test: merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index a41e8084145..5316b5233ae 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,6 +1206,139 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; + +-- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); + +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); +SELECT create_distributed_table('target_pushdowntest', 'id'); + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. + +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +-- DEBUG LOGS show that query is getting pushed down +MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); + +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); + + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); +SELECT create_distributed_table('source_withdata', 'id'); +SELECT create_distributed_table('target_table', 'id'); + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); + +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id, name) + VALUES (s.some_number, 'parag'); + +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + +-- let's verify target data too. +SELECT * FROM target_table; + + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; + +-- let's verify if data updated properly. +SELECT * FROM target_table; + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; + +SELECT * FROM target_table; + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; + +-- let's verify if data deleted properly. +SELECT * FROM target_table; + +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); + +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); + +SELECT * FROM target_table WHERE id = 10000; + +RESET client_min_messages; + + + -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true;