diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 840814f7f5b..54721c0e00e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -19,18 +19,19 @@ jobs: runs-on: ubuntu-latest name: Initialize parameters outputs: - build_image_name: "citus/extbuilder" - test_image_name: "citus/exttester" - citusupgrade_image_name: "citus/citusupgradetester" - fail_test_image_name: "citus/failtester" - pgupgrade_image_name: "citus/pgupgradetester" - style_checker_image_name: "citus/stylechecker" + build_image_name: "ghcr.io/citusdata/extbuilder" + test_image_name: "ghcr.io/citusdata/exttester" + citusupgrade_image_name: "ghcr.io/citusdata/citusupgradetester" + fail_test_image_name: "ghcr.io/citusdata/failtester" + pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester" + style_checker_image_name: "ghcr.io/citusdata/stylechecker" style_checker_tools_version: "0.8.18" - image_suffix: "-v9d71045" - pg14_version: '{ "major": "14", "full": "14.9" }' - pg15_version: '{ "major": "15", "full": "15.4" }' - pg16_version: '{ "major": "16", "full": "16.0" }' - upgrade_pg_versions: "14.9-15.4-16.0" + sql_snapshot_pg_version: "16.3" + image_suffix: "-v13fd57c" + pg14_version: '{ "major": "14", "full": "14.12" }' + pg15_version: '{ "major": "15", "full": "15.7" }' + pg16_version: '{ "major": "16", "full": "16.3" }' + upgrade_pg_versions: "14.12-15.7-16.3" steps: # Since GHA jobs needs at least one step we use a noop step here. - name: Set up parameters @@ -39,7 +40,7 @@ jobs: needs: params runs-on: ubuntu-20.04 container: - image: ${{ needs.params.outputs.build_image_name }}:latest + image: ${{ needs.params.outputs.build_image_name }}:${{ needs.params.outputs.sql_snapshot_pg_version }}${{ needs.params.outputs.image_suffix }} options: --user root steps: - uses: actions/checkout@v3.5.0 diff --git a/.github/workflows/packaging-test-pipelines.yml b/.github/workflows/packaging-test-pipelines.yml index fec0fefa925..6c3672fedfd 100644 --- a/.github/workflows/packaging-test-pipelines.yml +++ b/.github/workflows/packaging-test-pipelines.yml @@ -18,7 +18,7 @@ jobs: pg_versions: ${{ steps.get-postgres-versions.outputs.pg_versions }} steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 2 - name: Get Postgres Versions @@ -50,18 +50,6 @@ jobs: - almalinux-8 - almalinux-9 POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }} - # Postgres removed support for CentOS 7 in PG 16. Below block is needed to - # keep the build for CentOS 7 working for PG 14 and PG 15. - # Once dependent systems drop support for Centos 7, we can remove this block. - include: - - packaging_docker_image: centos-7 - POSTGRES_VERSION: 14 - - packaging_docker_image: centos-7 - POSTGRES_VERSION: 15 - - packaging_docker_image: oraclelinux-7 - POSTGRES_VERSION: 14 - - packaging_docker_image: oraclelinux-7 - POSTGRES_VERSION: 15 container: image: citus/packaging:${{ matrix.packaging_docker_image }}-pg${{ matrix.POSTGRES_VERSION }} @@ -69,7 +57,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set Postgres and python parameters for rpm based distros run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index edf338f4963..a226fe2e7b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +### citus v12.1.5 (July 17, 2024) ### + +* Adds support for MERGE commands with single shard distributed target tables + (#7643) + +* Fixes an error with MERGE commands when insert value does not have source + distribution column (#7627) + ### citus v12.1.4 (May 28, 2024) ### * Adds null check for node in HasRangeTableRef (#7604) diff --git a/configure b/configure index 1a20dcdedbd..468c8be10be 100755 --- a/configure +++ b/configure @@ -1,6 +1,6 @@ #! /bin/sh # Guess values for system-dependent variables and create Makefiles. -# Generated by GNU Autoconf 2.69 for Citus 12.1.4. +# Generated by GNU Autoconf 2.69 for Citus 12.1.5. # # # Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc. @@ -579,8 +579,8 @@ MAKEFLAGS= # Identity of this package. PACKAGE_NAME='Citus' PACKAGE_TARNAME='citus' -PACKAGE_VERSION='12.1.4' -PACKAGE_STRING='Citus 12.1.4' +PACKAGE_VERSION='12.1.5' +PACKAGE_STRING='Citus 12.1.5' PACKAGE_BUGREPORT='' PACKAGE_URL='' @@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then # Omit some internal or obsolete options to make the list less imposing. # This message is too long to be a string in the A/UX 3.1 sh. cat <<_ACEOF -\`configure' configures Citus 12.1.4 to adapt to many kinds of systems. +\`configure' configures Citus 12.1.5 to adapt to many kinds of systems. Usage: $0 [OPTION]... [VAR=VALUE]... @@ -1324,7 +1324,7 @@ fi if test -n "$ac_init_help"; then case $ac_init_help in - short | recursive ) echo "Configuration of Citus 12.1.4:";; + short | recursive ) echo "Configuration of Citus 12.1.5:";; esac cat <<\_ACEOF @@ -1429,7 +1429,7 @@ fi test -n "$ac_init_help" && exit $ac_status if $ac_init_version; then cat <<\_ACEOF -Citus configure 12.1.4 +Citus configure 12.1.5 generated by GNU Autoconf 2.69 Copyright (C) 2012 Free Software Foundation, Inc. @@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF This file contains any messages produced by compilers while running configure, to aid debugging if configure makes a mistake. -It was created by Citus $as_me 12.1.4, which was +It was created by Citus $as_me 12.1.5, which was generated by GNU Autoconf 2.69. Invocation command line was $ $0 $@ @@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1 # report actual input values of CONFIG_FILES etc. instead of their # values after options handling. ac_log=" -This file was extended by Citus $as_me 12.1.4, which was +This file was extended by Citus $as_me 12.1.5, which was generated by GNU Autoconf 2.69. Invocation command line was CONFIG_FILES = $CONFIG_FILES @@ -5455,7 +5455,7 @@ _ACEOF cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`" ac_cs_version="\\ -Citus config.status 12.1.4 +Citus config.status 12.1.5 configured by $0, generated by GNU Autoconf 2.69, with options \\"\$ac_cs_config\\" diff --git a/configure.ac b/configure.ac index 79ff5e430e8..e3063ee70be 100644 --- a/configure.ac +++ b/configure.ac @@ -5,7 +5,7 @@ # everyone needing autoconf installed, the resulting files are checked # into the SCM. -AC_INIT([Citus], [12.1.4]) +AC_INIT([Citus], [12.1.5]) AC_COPYRIGHT([Copyright (c) Citus Data, Inc.]) # we'll need sed and awk for some of the version commands diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index aaff24a10f5..1517c1d5adb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2572,7 +2572,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu * Find the shard interval and id for the partition column value for * non-reference tables. * - * For reference table, this function blindly returns the tables single + * For reference table, and single shard distributed table this function blindly returns the tables single * shard. */ ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry); diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 09d2d90acec..1f9d17c43fe 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -243,14 +243,27 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); - /* - * Get the index of the column in the source query that will be utilized - * to repartition the source rows, ensuring colocation with the target - */ - distributedPlan->sourceResultRepartitionColumnIndex = - SourceResultPartitionColumnIndex(mergeQuery, - sourceQuery->targetList, - targetRelation); + + if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) + { + /* + * if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1 + * so later in execution phase we don't rely on this value and try to find single shard of target instead. + */ + distributedPlan->sourceResultRepartitionColumnIndex = -1; + } + else + { + /* + * Get the index of the column in the source query that will be utilized + * to repartition the source rows, ensuring colocation with the target + */ + + distributedPlan->sourceResultRepartitionColumnIndex = + SourceResultPartitionColumnIndex(mergeQuery, + sourceQuery->targetList, + targetRelation); + } /* * Make a copy of the source query, since following code scribbles it @@ -262,11 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ int cursorOptions = CURSOR_OPT_PARALLEL_OK; PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions, boundParams); - bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); + bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); /* If plan is distributed, no work at the coordinator */ - if (repartitioned) + if (isRepartitionAllowed) { distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION; } @@ -1273,13 +1286,6 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation) { - if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) - { - ereport(ERROR, (errmsg("MERGE operation across distributed schemas " - "or with a row-based distributed table is " - "not yet supported"))); - } - /* Get all the Join conditions from the ON clause */ List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree); Var *targetColumn = targetRelation->partitionColumn; diff --git a/src/test/regress/Pipfile b/src/test/regress/Pipfile index 5bce63004cf..097ab151716 100644 --- a/src/test/regress/Pipfile +++ b/src/test/regress/Pipfile @@ -5,7 +5,7 @@ verify_ssl = true [packages] mitmproxy = {editable = true, ref = "main", git = "https://github.com/citusdata/mitmproxy.git"} -construct = "==2.9.45" +construct = "*" docopt = "==0.6.2" cryptography = ">=39.0.1" pytest = "*" diff --git a/src/test/regress/Pipfile.lock b/src/test/regress/Pipfile.lock index e6717e5fc0e..1b87dba5e0c 100644 --- a/src/test/regress/Pipfile.lock +++ b/src/test/regress/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "9568b1f3e4d4fd408e5e263f6346b0a4f479ac88e02f64bb79a9d482096e6a03" + "sha256": "f8db86383082539f626f1402e720f5f2e3f9718b44a8f26110cf9f52e7ca46bc" }, "pipfile-spec": 6, "requires": { @@ -201,10 +201,12 @@ }, "construct": { "hashes": [ - "sha256:2271a0efd0798679dea825ff47e22a4c550456a5db0ba8baa82f7eae0af0118c" + "sha256:4d2472f9684731e58cc9c56c463be63baa1447d674e0d66aeb5627b22f512c29", + "sha256:c80be81ef595a1a821ec69dc16099550ed22197615f4320b57cc9ce2a672cb30" ], "index": "pypi", - "version": "==2.9.45" + "markers": "python_version >= '3.6'", + "version": "==2.10.70" }, "cryptography": { "hashes": [ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 5dc5d7ca88e..62e9de6971a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -32,6 +32,7 @@ s/"t2_[0-9]+"/"t2_xxxxxxx"/g # shard table names for MERGE tests s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g +s/merge_vcore_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g # shard table names for multi_subquery s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g diff --git a/src/test/regress/expected/merge_schema_sharding.out b/src/test/regress/expected/merge_schema_sharding.out index 8a9ba89dd46..17f6f6adb6e 100644 --- a/src/test/regress/expected/merge_schema_sharding.out +++ b/src/test/regress/expected/merge_schema_sharding.out @@ -98,14 +98,26 @@ WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b) +DEBUG: Execute MERGE task list -- with a distributed table SET search_path TO schema_shard_table1; MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) @@ -114,7 +126,12 @@ WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, sch DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b) +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); @@ -163,7 +180,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b; DEBUG: A mix of distributed and reference table, try repartitioning DEBUG: A mix of distributed and reference table, routable query is not possible DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); @@ -174,7 +197,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b; DEBUG: A mix of distributed and local table, try repartitioning DEBUG: A mix of distributed and citus-local table, routable query is not possible DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b +DEBUG: Execute MERGE task list MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) WHEN MATCHED THEN DELETE; DEBUG: A mix of distributed and local table, try repartitioning @@ -210,7 +239,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b +DEBUG: Execute MERGE task list WITH cte AS materialized ( SELECT * FROM schema_shard_table.distributed_table ) @@ -219,7 +253,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b; DEBUG: Distributed tables are not co-located, try repartitioning DEBUG: For MERGE command, all the distributed tables must be colocated DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b +DEBUG: Execute MERGE task list SET client_min_messages TO WARNING; DROP SCHEMA schema_shard_table1 CASCADE; DROP SCHEMA schema_shard_table2 CASCADE; diff --git a/src/test/regress/expected/merge_vcore.out b/src/test/regress/expected/merge_vcore.out new file mode 100644 index 00000000000..03f6f8820e4 --- /dev/null +++ b/src/test/regress/expected/merge_vcore.out @@ -0,0 +1,481 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; +NOTICE: schema "merge_vcore_schema" does not exist, skipping +--MERGE INTO target +--USING source +--WHEN NOT MATCHED +--WHEN MATCHED AND +--WHEN MATCHED +CREATE SCHEMA merge_vcore_schema; +SET search_path TO merge_vcore_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 4000000; +SET citus.explain_all_tasks TO true; +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET client_min_messages = warning; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +-- ****************************************** CASE 1 : Both are singleSharded*************************************** +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000000 source +(8 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 2 : source is single sharded and target is distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000002 source +(8 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 3 : source is distributed and target is single sharded ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000007 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000008 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000009 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000010 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 4 : both are distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000012 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000013 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000014 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000015 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 5 : both are distributed & colocated ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', 'id', colocate_with=>'source'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000020 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000021 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000022 source + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_4000023 source +(17 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +-- *************** CASE 6 : both are singlesharded & colocated ******************************* +CREATE TABLE source ( + id bigint, + doc text +); +CREATE TABLE target ( + id bigint, + doc text +); +SELECT create_distributed_table('source', null); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target', null, colocate_with=>'source'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"a" : 1} + 2 | {"a" : 2} +(2 rows) + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); +SELECT * FROM target; + id | doc +--------------------------------------------------------------------- + 2 | {"b" : 1} + 2 | {"b" : 1} +(2 rows) + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4000029 target + -> Nested Loop + -> Seq Scan on source_4000028 source + -> Materialize + -> Seq Scan on target_4000029 target + Filter: ('2'::bigint = id) +(11 rows) + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; diff --git a/src/test/regress/expected/merge_vcore_0.out b/src/test/regress/expected/merge_vcore_0.out new file mode 100644 index 00000000000..a7e3fbf2062 --- /dev/null +++ b/src/test/regress/expected/merge_vcore_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8131ad7cb2a..b2badd878c2 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1413,7 +1413,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; SHOW citus.version; citus.version --------------------------------------------------------------------- - 12.1.4 + 12.1.5 (1 row) -- ensure no unexpected objects were created outside pg_catalog diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a564699931e..d32757e4860 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -117,6 +117,7 @@ test: merge pgmerge test: merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables +test: merge_vcore # --------- # test that no tests leaked intermediate results. This should always be last diff --git a/src/test/regress/sql/merge_vcore.sql b/src/test/regress/sql/merge_vcore.sql new file mode 100644 index 00000000000..472bbfe914f --- /dev/null +++ b/src/test/regress/sql/merge_vcore.sql @@ -0,0 +1,319 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; +--MERGE INTO target +--USING source +--WHEN NOT MATCHED +--WHEN MATCHED AND +--WHEN MATCHED + +CREATE SCHEMA merge_vcore_schema; +SET search_path TO merge_vcore_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 4000000; +SET citus.explain_all_tasks TO true; +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET client_min_messages = warning; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + + +-- ****************************************** CASE 1 : Both are singleSharded*************************************** +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null, colocate_with=>'none'); +SELECT create_distributed_table('target', null, colocate_with=>'none'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 2 : source is single sharded and target is distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null, colocate_with=>'none'); +SELECT create_distributed_table('target', 'id'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + + + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 3 : source is distributed and target is single sharded ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', null); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 4 : both are distributed ******************************* +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', 'id'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 5 : both are distributed & colocated ******************************* + +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', 'id'); +SELECT create_distributed_table('target', 'id', colocate_with=>'source'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + + +-- *************** CASE 6 : both are singlesharded & colocated ******************************* + +CREATE TABLE source ( + id bigint, + doc text +); + +CREATE TABLE target ( + id bigint, + doc text +); + +SELECT create_distributed_table('source', null); +SELECT create_distributed_table('target', null, colocate_with=>'source'); + +INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); + +-- insert +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- update +MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) AND src.doc = target.doc +WHEN MATCHED THEN +UPDATE SET doc = '{"b" : 1}' +WHEN NOT MATCHED THEN +INSERT (id, doc) +VALUES (src.t_id, doc); + +SELECT * FROM target; + +-- Explain +EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src +ON (src.t_id = target.id) +WHEN MATCHED THEN DO NOTHING; + +DROP TABLE IF EXISTS source; +DROP TABLE IF EXISTS target; + +DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; + + + +