Skip to content

Commit

Permalink
Release 1 6 1 (#217)
Browse files Browse the repository at this point in the history
* Identifier quoting checkpoint

* Identifier quoting checkpoint

* Fix distributed table local quoting

* Fix issues with deduplication settings
  • Loading branch information
genzgd authored Dec 5, 2023
1 parent 246a4d8 commit 08bbbf9
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 59 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
### Release [1.6.1], 2023-12-04
#### Bug Fixes
- Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words
or containing special characters. This has been fixed and some macros have been updated to correctly handle such identifiers.
Note that there still may be untested edge cases where nonstandard identifiers cause issues, so they are still not recommended.
Closes https://github.com/ClickHouse/dbt-clickhouse/issues/144. Thanks to [Alexandru Pisarenco](https://github.com/apisarenco) for the
report and initial PR!
- The new `allow_automatic_deduplication` setting was not being correctly propagated to the adapter, so setting it to `True`
did not have the intended affect. In addition, this setting is now ignored for older ClickHouse versions that
do not support `CREATE TABLE AS SELECT ... EMPTY`, since the automatic deduplication window is required to allow correct
inserts in Replicated tables on those older versions. Fixes https://github.com/ClickHouse/dbt-clickhouse/issues/216.

### Release [1.6.0], 2023-11-30
#### Improvements
- Compatible with dbt 1.6.x. Note that dbt new `clone` feature is not supported, as ClickHouse has no native "light weight"
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.6.0'
version = '1.6.1'
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ClickHouseCredentials(Credentials):
custom_settings: Optional[Dict[str, Any]] = None
use_lw_deletes: bool = False
local_suffix: str = 'local'
allow_automatic_deduplication = False
allow_automatic_deduplication: bool = False

@property
def type(self):
Expand Down
13 changes: 9 additions & 4 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.util import compare_versions

LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
Expand Down Expand Up @@ -82,7 +84,10 @@ def __init__(self, credentials: ClickHouseCredentials):
self.close()
raise ex
self._model_settings = {}
if not credentials.allow_automatic_deduplication:
if (
not credentials.allow_automatic_deduplication
and compare_versions(self._server_version(), '22.7.1.2484') >= 0
):
self._model_settings[DEDUP_WINDOW_SETTING] = '0'

@abstractmethod
Expand Down Expand Up @@ -159,7 +164,7 @@ def _check_lightweight_deletes(self, requested: bool):
def _ensure_database(self, database_engine, cluster_name) -> None:
if not self.database:
return
check_db = f'EXISTS DATABASE {self.database}'
check_db = f'EXISTS DATABASE {quote_identifier(self.database)}'
try:
db_exists = self.command(check_db)
if not db_exists:
Expand All @@ -170,7 +175,7 @@ def _ensure_database(self, database_engine, cluster_name) -> None:
else ''
)
self.command(
f'CREATE DATABASE IF NOT EXISTS {self.database}{cluster_clause}{engine_clause}'
f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}'
)
db_exists = self.command(check_db)
if not db_exists:
Expand All @@ -194,7 +199,7 @@ def _check_atomic_exchange(self) -> bool:
table_id = str(uuid.uuid1()).replace('-', '')
swap_tables = [f'__dbt_exchange_test_{x}_{table_id}' for x in range(0, 2)]
for table in swap_tables:
self.command(create_cmd.format(table))
self.command(create_cmd.format(quote_identifier(table)))
try:
self.command('EXCHANGE TABLES {} AND {}'.format(*swap_tables))
return True
Expand Down
13 changes: 1 addition & 12 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation
from dbt.adapters.clickhouse.util import compare_versions

GET_CATALOG_MACRO_NAME = 'get_catalog'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
Expand Down Expand Up @@ -438,18 +439,6 @@ def test(row: agate.Row) -> bool:
return test


def compare_versions(v1: str, v2: str) -> int:
v1_parts = v1.split('.')
v2_parts = v2.split('.')
for part1, part2 in zip(v1_parts, v2_parts):
try:
if int(part1) != int(part2):
return 1 if int(part1) > int(part2) else -1
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0


COLUMNS_EQUAL_SQL = '''
SELECT
row_count_diff.difference as row_count_difference,
Expand Down
26 changes: 14 additions & 12 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

from dbt.adapters.base.relation import BaseRelation, Policy, Self
from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition
from dbt.contracts.relation import HasQuoting
from dbt.contracts.relation import HasQuoting, Path, RelationType
from dbt.exceptions import DbtRuntimeError
from dbt.utils import deep_merge, merge

from dbt.adapters.clickhouse.query import quote_identifier


@dataclass
class ClickHouseQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
Expand All @@ -26,7 +28,7 @@ class ClickHouseIncludePolicy(Policy):
class ClickHouseRelation(BaseRelation):
quote_policy: Policy = field(default_factory=lambda: ClickHouseQuotePolicy())
include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy())
quote_character: str = ''
quote_character: str = '`'
can_exchange: bool = False
can_on_cluster: bool = False

Expand All @@ -35,13 +37,13 @@ def __post_init__(self):
raise DbtRuntimeError(f'Cannot set database {self.database} in clickhouse!')
self.path.database = ''

def render(self):
if self.include_policy.database and self.include_policy.schema:
raise DbtRuntimeError(
'Got a clickhouse relation with schema and database set to '
'include, but only one can be set'
)
return super().render()
def render(self) -> str:
return ".".join(quote_identifier(part) for _, part in self._render_iterator() if part)

def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation:
path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix)
derivative_type = RelationType[relation_type] if relation_type else self.type
return ClickHouseRelation(type=derivative_type, path=path)

def matches(
self,
Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/clickhouse/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dbt.exceptions import DbtRuntimeError


def compare_versions(v1: str, v2: str) -> int:
v1_parts = v1.split('.')
v2_parts = v2.split('.')
for part1, part2 in zip(v1_parts, v2_parts):
try:
if int(part1) != int(part2):
return 1 if int(part1) > int(part2) else -1
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@

{% call statement('delete_existing_data') %}
{% if is_distributed %}
delete from {{ existing_relation }}{{ adapter.get_clickhouse_local_suffix() }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
{%- set existing_local = existing_relation.derivative(adapter.get_clickhouse_local_suffix()) %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
{%- materialization materialized_view, adapter='clickhouse' -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set mv_name = target_relation.name + '_mv' -%}
{%- set target_mv = api.Relation.create(identifier=mv_name, schema=schema, database=database, type='materialized_view') -%}
{%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}

{# look for an existing relation for the target table and create backup relations if necessary #}
Expand Down Expand Up @@ -44,14 +43,14 @@
{% elif existing_relation.can_exchange %}
{{ log('Replacing existing materialized view' + target_relation.name) }}
{% call statement('drop existing materialized view') %}
drop view if exists {{ mv_name }} {{ cluster_clause }}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{% endcall %}
{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
{%- endcall %}
{% do exchange_tables_atomic(backup_relation, existing_relation) %}
{% call statement('create new materialized view') %}
{{ clickhouse__create_mv_sql(mv_name, existing_relation.name, cluster_clause, sql) }}
{{ clickhouse__create_mv_sql(mv_relation, existing_relation.name, cluster_clause, sql) }}
{% endcall %}
{% else %}
{{ log('Replacing existing materialized view' + target_relation.name) }}
Expand All @@ -72,7 +71,7 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation, target_mv]}) }}
{{ return({'relations': [target_relation, mv_relation]}) }}

{%- endmaterialization -%}

Expand All @@ -88,13 +87,13 @@
{{ get_create_table_as_sql(False, relation, sql) }}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_name = relation.name + '_mv' -%}
{{ clickhouse__create_mv_sql(mv_name, relation.name, cluster_clause, sql) }}
{%- set mv_relation = relation.derivative('_mv', 'MaterializedView') -%}
{{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }}
{%- endmacro %}


{% macro clickhouse__create_mv_sql(relation_name, target_table, cluster_clause, sql) -%}
create materialized view if not exists {{ relation_name }} {{ cluster_clause }}
{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%}
create materialized view if not exists {{ mv_relation }} {{ cluster_clause }}
to {{ target_table }}
as {{ sql }}
{%- endmacro %}
Expand All @@ -103,9 +102,9 @@
{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %}
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set mv_name = target_relation.name + '_mv' -%}
{%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%}
{% call statement('drop existing mv') -%}
drop view if exists {{ mv_name }} {{ cluster_clause }}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{%- endcall %}

{# recreate the target table #}
Expand All @@ -116,5 +115,5 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{# now that the target table is recreated, we can finally create our new view #}
{{ clickhouse__create_mv_sql(mv_name, target_relation.name, cluster_clause, sql) }}
{{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
{%- set insert_cols_csv = insert_cols | join(', ') -%}
{%- set valid_to_col = adapter.quote('dbt_valid_to') -%}

{%- set upsert = target ~ '__snapshot_upsert' -%}
{%- set upsert = target.derivative('__snapshot_upsert') -%}
{% call statement('create_upsert_relation') %}
create table if not exists {{ upsert }} as {{ target }}
{% endcall %}
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
{% call statement('create_table_empty') %}
{{ create_table }}
{% endcall %}
{{ clickhouse__insert_into(relation.include(database=False), sql, has_contract) }}
{{ clickhouse__insert_into(relation, sql, has_contract) }}
{%- endif %}
{%- endmacro %}

Expand All @@ -151,7 +151,7 @@
{{ sql_header if sql_header is not none }}

{% if temporary -%}
create temporary table {{ relation.name }}
create temporary table {{ relation }}
engine Memory
{{ order_cols(label="order by") }}
{{ partition_cols(label="partition by") }}
Expand All @@ -160,7 +160,7 @@
{{ sql }}
)
{%- else %}
create table {{ relation.include(database=False) }}
create table {{ relation }}
{{ on_cluster_clause(relation)}}
{%- if has_contract%}
{{ get_assert_columns_equivalent(sql) }}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/adapter/aliases/test_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ def test_alias_model_name(self, project):
assert len(results) == 4

cluster = project.test_config['cluster']
relation = relation_from_name(project.adapter, "foo")
local_relation = relation_from_name(project.adapter, "foo_local")

result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ",
fetch="one",
)
assert result[0] == "foo"

relation = relation_from_name(project.adapter, "ref_foo_alias")
local_relation = relation_from_name(project.adapter, "ref_foo_alias_local")
result = project.run_sql(
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ",
f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ",
fetch="one",
)
assert result[0] == "ref_foo_alias"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ def seeds(self):
}

def assert_total_count_correct(self, project):
'''Check if data is properly distributed'''
# Check if data is properly distributed
cluster = project.test_config['cluster']
table_relation = relation_from_name(project.adapter, "distributed")
table_relation = relation_from_name(project.adapter, "distributed_local")
cluster_info = project.run_sql(
f"select shard_num,max(host_name) as host_name, count(distinct replica_num) as replica_counts "
f"from system.clusters where cluster='{cluster}' group by shard_num",
fetch="all",
)
sum_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}',{table_relation}_local)",
f"select count() From clusterAllReplicas('{cluster}',{table_relation})",
fetch="one",
)
total_count = 0
# total count should be equal to sum(count of each shard * replica_counts)
for shard_num, host_name, replica_counts in cluster_info:
count = project.run_sql(
f"select count() From remote('{host_name}',{table_relation}_local)",
f"select count() From remote('{host_name}',{table_relation})",
fetch="one",
)
total_count += count[0] * replica_counts
Expand All @@ -103,7 +103,7 @@ def assert_total_count_correct(self, project):
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exists
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pytest
from dbt.tests.util import check_relation_types, run_dbt

from dbt.adapters.clickhouse.query import quote_identifier

PEOPLE_SEED_CSV = """
id,name,age,department
1231,Dade,33,engineering
Expand Down Expand Up @@ -116,7 +118,7 @@ def test_create(self, project):
# insert some data and make sure it reaches the target table
project.run_sql(
f"""
insert into {project.test_schema}.people ("id", "name", "age", "department")
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware');
"""
)
Expand Down Expand Up @@ -153,7 +155,7 @@ def test_update(self, project):

project.run_sql(
f"""
insert into {project.test_schema}.people ("id", "name", "age", "department")
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_adapter.py → tests/unit/test_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dbt.adapters.clickhouse.impl import compare_versions
from dbt.adapters.clickhouse.util import compare_versions


def test_is_before_version():
Expand Down

0 comments on commit 08bbbf9

Please sign in to comment.