Skip to content

Commit

Permalink
Fix distributed table local quoting
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Dec 1, 2023
1 parent ba07dfc commit 4c9c445
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@

{% call statement('delete_existing_data') %}
{% if is_distributed %}
delete from {{ existing_relation }}{{ adapter.get_clickhouse_local_suffix() }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
{%- set existing_local = existing_relation.derivative(adapter.get_clickhouse_local_suffix()) %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/adapter/aliases/test_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ def test_alias_model_name(self, project):
assert len(results) == 4

cluster = project.test_config['cluster']
relation = relation_from_name(project.adapter, "foo")
local_relation = relation_from_name(project.adapter, "foo_local")

result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ",
fetch="one",
)
assert result[0] == "foo"

relation = relation_from_name(project.adapter, "ref_foo_alias")
local_relation = relation_from_name(project.adapter, "ref_foo_alias_local")
result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ",
fetch="one",
)
assert result[0] == "ref_foo_alias"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ def seeds(self):
}

def assert_total_count_correct(self, project):
'''Check if data is properly distributed'''
# Check if data is properly distributed
cluster = project.test_config['cluster']
table_relation = relation_from_name(project.adapter, "distributed")
table_relation = relation_from_name(project.adapter, "distributed_local")
cluster_info = project.run_sql(
f"select shard_num,max(host_name) as host_name, count(distinct replica_num) as replica_counts "
f"from system.clusters where cluster='{cluster}' group by shard_num",
fetch="all",
)
sum_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}',{table_relation}_local)",
f"select count() From clusterAllReplicas('{cluster}',{table_relation})",
fetch="one",
)
total_count = 0
# total count should be equal to sum(count of each shard * replica_counts)
for shard_num, host_name, replica_counts in cluster_info:
count = project.run_sql(
f"select count() From remote('{host_name}',{table_relation}_local)",
f"select count() From remote('{host_name}',{table_relation})",
fetch="one",
)
total_count += count[0] * replica_counts
Expand All @@ -103,7 +103,7 @@ def assert_total_count_correct(self, project):
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exists
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

Expand Down

0 comments on commit 4c9c445

Please sign in to comment.