From 15338626107a6520aa5b98a3af55145fb30c5d84 Mon Sep 17 00:00:00 2001 From: gfunc Date: Tue, 19 Sep 2023 15:15:52 +0000 Subject: [PATCH] fix wrong alias for distributed materializations update aliase test --- README.md | 3 +- .../materializations/distributed_table.sql | 4 +- .../incremental/distributed_incremental.sql | 6 +- .../incremental/incremental.sql | 8 +- tests/integration/adapter/test_aliases.py | 101 ++++++++++++++++++ 5 files changed, 112 insertions(+), 10 deletions(-) create mode 100644 tests/integration/adapter/test_aliases.py diff --git a/README.md b/README.md index 156040ef..e12fca39 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ your_profile_name: | engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) | | order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) | | partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional | +| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) | | primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | | unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional | | inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional | @@ -143,7 +144,7 @@ The following macros are included to facilitate creating ClickHouse specific tab - `partition_cols` -- Uses the `partition_by` model configuration property to assign a ClickHouse partition key. No partition key is assigned by default. - `order_cols` -- Uses the `order_by` model configuration to assign a ClickHouse order by/sorting key. If not specified ClickHouse will use an empty tuple() and the table will be unsorted - `primary_key_clause` -- Uses the `primary_key` model configuration property to assign a ClickHouse primary key. By default, primary key is set and ClickHouse will use the order by clause as the primary key. -- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to all dbt-operations +- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to certain dbt-operations: distributed materializations, views creation, database creation. ### s3Source Helper Macro diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index 2f66f87b..d9bda1c5 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -14,8 +14,8 @@ {% do exceptions.raise_compiler_error('To use distributed materialization cluster setting in dbt profile must be set') %} {% endif %} - {% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %} - {% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %} + {% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %} + {% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %} {%- set backup_relation = none -%} {%- set preexisting_backup_relation = none -%} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index 5818cb31..568ada36 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -14,8 +14,8 @@ {% do exceptions.raise_compiler_error('To use distributed materializations cluster setting in dbt profile must be set') %} {% endif %} - {% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %} - {% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %} + {% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %} + {% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %} {%- set unique_key = config.get('unique_key') -%} {% if unique_key is not none and unique_key|length == 0 %} @@ -105,7 +105,7 @@ {% endif %} -- Structure could have changed, need to update distributed table from replaced local table - {% set target_relation_new = target_relation.incorporate(path={"identifier": model['name'] + '_temp'}) %} + {% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %} {{ drop_relation_if_exists(target_relation_new) }} {% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index ca2b0dc6..9f9fa4bc 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -135,7 +135,7 @@ {% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_new_data'}) %} + {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %} {{ drop_relation_if_exists(new_data_relation) }} {%- set inserted_relation = intermediate_relation -%} @@ -144,7 +144,7 @@ -- First create a temporary table for all of the new data {% if is_distributed %} -- Need to use distributed table to have data on all shards - {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_distributed_new_data'}) -%} + {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} {%- set inserting_relation = distributed_new_data_relation -%} {{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }} {% else %} @@ -199,10 +199,10 @@ {% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} {{ drop_relation_if_exists(new_data_relation) }} - {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_distributed_new_data'}) -%} + {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} {%- set inserting_relation = new_data_relation -%} diff --git a/tests/integration/adapter/test_aliases.py b/tests/integration/adapter/test_aliases.py new file mode 100644 index 00000000..30575aa8 --- /dev/null +++ b/tests/integration/adapter/test_aliases.py @@ -0,0 +1,101 @@ +import os + +import pytest +from dbt.tests.adapter.aliases.fixtures import ( + MODELS__ALIAS_IN_PROJECT_SQL, + MODELS__ALIAS_IN_PROJECT_WITH_OVERRIDE_SQL, + MODELS__SCHEMA_YML, +) +from dbt.tests.adapter.aliases.test_aliases import ( + BaseAliasErrors, + BaseAliases, + BaseSameAliasDifferentDatabases, + BaseSameAliasDifferentSchemas, +) +from dbt.tests.util import relation_from_name, run_dbt + +MODELS__DISTRIBUTED_FOO_ALIAS_SQL = """ + +{{ + config( + alias='foo', + materialized='distributed_table' + ) +}} + +select {{ string_literal(this.name) }} as tablename + +""" + +MODELS__DISTRIBUTED_REF_FOO_ALIAS_SQL = """ + +{{ + config( + materialized='distributed_table' + ) +}} + +with trigger_ref as ( + + -- we should still be able to ref a model by its filepath + select * from {{ ref('foo_alias') }} + +) + +-- this name should still be the filename +select {{ string_literal(this.name) }} as tablename + +""" + + +class TestAliases(BaseAliases): + pass + + +class TestAliasErrors(BaseAliasErrors): + pass + + +class TestSameAliasDifferentSchemas(BaseSameAliasDifferentSchemas): + pass + + +class TestSameAliasDifferentDatabases(BaseSameAliasDifferentDatabases): + pass + + +class TestDistributedAliases(BaseAliases): + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": MODELS__SCHEMA_YML, + "foo_alias.sql": MODELS__DISTRIBUTED_FOO_ALIAS_SQL, + "alias_in_project.sql": MODELS__ALIAS_IN_PROJECT_SQL, + "alias_in_project_with_override.sql": MODELS__ALIAS_IN_PROJECT_WITH_OVERRIDE_SQL, + "ref_foo_alias.sql": MODELS__DISTRIBUTED_REF_FOO_ALIAS_SQL, + } + + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_alias_model_name(self, project): + results = run_dbt(["run"]) + assert len(results) == 4 + + cluster = project.test_config['cluster'] + relation = relation_from_name(project.adapter, "foo") + + result = project.run_sql( + f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ", + fetch="one", + ) + assert result[0] == "foo" + + relation = relation_from_name(project.adapter, "ref_foo_alias") + result = project.run_sql( + f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ", + fetch="one", + ) + assert result[0] == "ref_foo_alias" + + run_dbt(["test"])