Skip to content

Commit

Permalink
fix wrong alias for distributed materializations
Browse files Browse the repository at this point in the history
update aliase test
  • Loading branch information
gfunc committed Sep 19, 2023
1 parent 33a31b2 commit 1533862
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 10 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ your_profile_name:
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
Expand Down Expand Up @@ -143,7 +144,7 @@ The following macros are included to facilitate creating ClickHouse specific tab
- `partition_cols` -- Uses the `partition_by` model configuration property to assign a ClickHouse partition key. No partition key is assigned by default.
- `order_cols` -- Uses the `order_by` model configuration to assign a ClickHouse order by/sorting key. If not specified ClickHouse will use an empty tuple() and the table will be unsorted
- `primary_key_clause` -- Uses the `primary_key` model configuration property to assign a ClickHouse primary key. By default, primary key is set and ClickHouse will use the order by clause as the primary key.
- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to all dbt-operations
- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to certain dbt-operations: distributed materializations, views creation, database creation.

### s3Source Helper Macro

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
{% do exceptions.raise_compiler_error('To use distributed materialization cluster setting in dbt profile must be set') %}
{% endif %}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %}
{% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %}

{%- set backup_relation = none -%}
{%- set preexisting_backup_relation = none -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
{% do exceptions.raise_compiler_error('To use distributed materializations cluster setting in dbt profile must be set') %}
{% endif %}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %}
{% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %}

{%- set unique_key = config.get('unique_key') -%}
{% if unique_key is not none and unique_key|length == 0 %}
Expand Down Expand Up @@ -105,7 +105,7 @@
{% endif %}

-- Structure could have changed, need to update distributed table from replaced local table
{% set target_relation_new = target_relation.incorporate(path={"identifier": model['name'] + '_temp'}) %}
{% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %}
{{ drop_relation_if_exists(target_relation_new) }}
{% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@


{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_new_data'}) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %}
{{ drop_relation_if_exists(new_data_relation) }}

{%- set inserted_relation = intermediate_relation -%}
Expand All @@ -144,7 +144,7 @@
-- First create a temporary table for all of the new data
{% if is_distributed %}
-- Need to use distributed table to have data on all shards
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_distributed_new_data'}) -%}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set inserting_relation = distributed_new_data_relation -%}
{{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
{% else %}
Expand Down Expand Up @@ -199,10 +199,10 @@


{% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{{ drop_relation_if_exists(new_data_relation) }}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + '__dbt_distributed_new_data'}) -%}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}

{%- set inserting_relation = new_data_relation -%}

Expand Down
101 changes: 101 additions & 0 deletions tests/integration/adapter/test_aliases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os

import pytest
from dbt.tests.adapter.aliases.fixtures import (
MODELS__ALIAS_IN_PROJECT_SQL,
MODELS__ALIAS_IN_PROJECT_WITH_OVERRIDE_SQL,
MODELS__SCHEMA_YML,
)
from dbt.tests.adapter.aliases.test_aliases import (
BaseAliasErrors,
BaseAliases,
BaseSameAliasDifferentDatabases,
BaseSameAliasDifferentSchemas,
)
from dbt.tests.util import relation_from_name, run_dbt

MODELS__DISTRIBUTED_FOO_ALIAS_SQL = """
{{
config(
alias='foo',
materialized='distributed_table'
)
}}
select {{ string_literal(this.name) }} as tablename
"""

MODELS__DISTRIBUTED_REF_FOO_ALIAS_SQL = """
{{
config(
materialized='distributed_table'
)
}}
with trigger_ref as (
-- we should still be able to ref a model by its filepath
select * from {{ ref('foo_alias') }}
)
-- this name should still be the filename
select {{ string_literal(this.name) }} as tablename
"""


class TestAliases(BaseAliases):
pass


class TestAliasErrors(BaseAliasErrors):
pass


class TestSameAliasDifferentSchemas(BaseSameAliasDifferentSchemas):
pass


class TestSameAliasDifferentDatabases(BaseSameAliasDifferentDatabases):
pass


class TestDistributedAliases(BaseAliases):
@pytest.fixture(scope="class")
def models(self):
return {
"schema.yml": MODELS__SCHEMA_YML,
"foo_alias.sql": MODELS__DISTRIBUTED_FOO_ALIAS_SQL,
"alias_in_project.sql": MODELS__ALIAS_IN_PROJECT_SQL,
"alias_in_project_with_override.sql": MODELS__ALIAS_IN_PROJECT_WITH_OVERRIDE_SQL,
"ref_foo_alias.sql": MODELS__DISTRIBUTED_REF_FOO_ALIAS_SQL,
}

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_alias_model_name(self, project):
results = run_dbt(["run"])
assert len(results) == 4

cluster = project.test_config['cluster']
relation = relation_from_name(project.adapter, "foo")

result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
fetch="one",
)
assert result[0] == "foo"

relation = relation_from_name(project.adapter, "ref_foo_alias")
result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
fetch="one",
)
assert result[0] == "ref_foo_alias"

run_dbt(["test"])

0 comments on commit 1533862

Please sign in to comment.