diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ad6630f..1db44176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +### Release [1.6.1], 2023-12-04 +#### Bug Fixes +- Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words +or containing special characters. This has been fixed and some macros have been updated to correctly handle such identifiers. +Note that there still may be untested edge cases where nonstandard identifiers cause issues, so they are still not recommended. +Closes https://github.com/ClickHouse/dbt-clickhouse/issues/144. Thanks to [Alexandru Pisarenco](https://github.com/apisarenco) for the +report and initial PR! +- The new `allow_automatic_deduplication` setting was not being correctly propagated to the adapter, so setting it to `True` +did not have the intended affect. In addition, this setting is now ignored for older ClickHouse versions that +do not support `CREATE TABLE AS SELECT ... EMPTY`, since the automatic deduplication window is required to allow correct +inserts in Replicated tables on those older versions. Fixes https://github.com/ClickHouse/dbt-clickhouse/issues/216. + ### Release [1.6.0], 2023-11-30 #### Improvements - Compatible with dbt 1.6.x. Note that dbt new `clone` feature is not supported, as ClickHouse has no native "light weight" diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index f7c7de21..43239b87 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.6.0' +version = '1.6.1' diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index cbf65069..d0775c6a 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -33,7 +33,7 @@ class ClickHouseCredentials(Credentials): custom_settings: Optional[Dict[str, Any]] = None use_lw_deletes: bool = False local_suffix: str = 'local' - allow_automatic_deduplication = False + allow_automatic_deduplication: bool = False @property def type(self): diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index 9b8e1ee1..bf6f3725 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -6,6 +6,8 @@ from dbt.adapters.clickhouse.credentials import ClickHouseCredentials from dbt.adapters.clickhouse.logger import logger +from dbt.adapters.clickhouse.query import quote_identifier +from dbt.adapters.clickhouse.util import compare_versions LW_DELETE_SETTING = 'allow_experimental_lightweight_delete' ND_MUTATION_SETTING = 'allow_nondeterministic_mutations' @@ -82,7 +84,10 @@ def __init__(self, credentials: ClickHouseCredentials): self.close() raise ex self._model_settings = {} - if not credentials.allow_automatic_deduplication: + if ( + not credentials.allow_automatic_deduplication + and compare_versions(self._server_version(), '22.7.1.2484') >= 0 + ): self._model_settings[DEDUP_WINDOW_SETTING] = '0' @abstractmethod @@ -159,7 +164,7 @@ def _check_lightweight_deletes(self, requested: bool): def _ensure_database(self, database_engine, cluster_name) -> None: if not self.database: return - check_db = f'EXISTS DATABASE {self.database}' + check_db = f'EXISTS DATABASE {quote_identifier(self.database)}' try: db_exists = self.command(check_db) if not db_exists: @@ -170,7 +175,7 @@ def _ensure_database(self, database_engine, cluster_name) -> None: else '' ) self.command( - f'CREATE DATABASE IF NOT EXISTS {self.database}{cluster_clause}{engine_clause}' + f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}' ) db_exists = self.command(check_db) if not db_exists: @@ -194,7 +199,7 @@ def _check_atomic_exchange(self) -> bool: table_id = str(uuid.uuid1()).replace('-', '') swap_tables = [f'__dbt_exchange_test_{x}_{table_id}' for x in range(0, 2)] for table in swap_tables: - self.command(create_cmd.format(table)) + self.command(create_cmd.format(quote_identifier(table))) try: self.command('EXCHANGE TABLES {} AND {}'.format(*swap_tables)) return True diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 6cc6055f..bd20fb03 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -23,6 +23,7 @@ from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier from dbt.adapters.clickhouse.relation import ClickHouseRelation +from dbt.adapters.clickhouse.util import compare_versions GET_CATALOG_MACRO_NAME = 'get_catalog' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' @@ -438,18 +439,6 @@ def test(row: agate.Row) -> bool: return test -def compare_versions(v1: str, v2: str) -> int: - v1_parts = v1.split('.') - v2_parts = v2.split('.') - for part1, part2 in zip(v1_parts, v2_parts): - try: - if int(part1) != int(part2): - return 1 if int(part1) > int(part2) else -1 - except ValueError: - raise DbtRuntimeError("Version must consist of only numbers separated by '.'") - return 0 - - COLUMNS_EQUAL_SQL = ''' SELECT row_count_diff.difference as row_count_difference, diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 818928d8..cc2865f4 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -3,16 +3,18 @@ from dbt.adapters.base.relation import BaseRelation, Policy, Self from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition -from dbt.contracts.relation import HasQuoting +from dbt.contracts.relation import HasQuoting, Path, RelationType from dbt.exceptions import DbtRuntimeError from dbt.utils import deep_merge, merge +from dbt.adapters.clickhouse.query import quote_identifier + @dataclass class ClickHouseQuotePolicy(Policy): - database: bool = False - schema: bool = False - identifier: bool = False + database: bool = True + schema: bool = True + identifier: bool = True @dataclass @@ -26,7 +28,7 @@ class ClickHouseIncludePolicy(Policy): class ClickHouseRelation(BaseRelation): quote_policy: Policy = field(default_factory=lambda: ClickHouseQuotePolicy()) include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy()) - quote_character: str = '' + quote_character: str = '`' can_exchange: bool = False can_on_cluster: bool = False @@ -35,13 +37,13 @@ def __post_init__(self): raise DbtRuntimeError(f'Cannot set database {self.database} in clickhouse!') self.path.database = '' - def render(self): - if self.include_policy.database and self.include_policy.schema: - raise DbtRuntimeError( - 'Got a clickhouse relation with schema and database set to ' - 'include, but only one can be set' - ) - return super().render() + def render(self) -> str: + return ".".join(quote_identifier(part) for _, part in self._render_iterator() if part) + + def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation: + path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix) + derivative_type = RelationType[relation_type] if relation_type else self.type + return ClickHouseRelation(type=derivative_type, path=path) def matches( self, diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py new file mode 100644 index 00000000..bfe7d239 --- /dev/null +++ b/dbt/adapters/clickhouse/util.py @@ -0,0 +1,13 @@ +from dbt.exceptions import DbtRuntimeError + + +def compare_versions(v1: str, v2: str) -> int: + v1_parts = v1.split('.') + v2_parts = v2.split('.') + for part1, part2 in zip(v1_parts, v2_parts): + try: + if int(part1) != int(part2): + return 1 if int(part1) > int(part2) else -1 + except ValueError: + raise DbtRuntimeError("Version must consist of only numbers separated by '.'") + return 0 diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index ca15991b..7ab105d7 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -218,7 +218,8 @@ {% call statement('delete_existing_data') %} {% if is_distributed %} - delete from {{ existing_relation }}{{ adapter.get_clickhouse_local_suffix() }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} + {%- set existing_local = existing_relation.derivative(adapter.get_clickhouse_local_suffix()) %} + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} from {{ inserting_relation }}) {% else %} delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql index 8ba96d02..293cc41b 100644 --- a/dbt/include/clickhouse/macros/materializations/materialized_view.sql +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -6,8 +6,7 @@ {%- materialization materialized_view, adapter='clickhouse' -%} {%- set target_relation = this.incorporate(type='table') -%} - {%- set mv_name = target_relation.name + '_mv' -%} - {%- set target_mv = api.Relation.create(identifier=mv_name, schema=schema, database=database, type='materialized_view') -%} + {%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%} {%- set cluster_clause = on_cluster_clause(target_relation) -%} {# look for an existing relation for the target table and create backup relations if necessary #} @@ -44,14 +43,14 @@ {% elif existing_relation.can_exchange %} {{ log('Replacing existing materialized view' + target_relation.name) }} {% call statement('drop existing materialized view') %} - drop view if exists {{ mv_name }} {{ cluster_clause }} + drop view if exists {{ mv_relation }} {{ cluster_clause }} {% endcall %} {% call statement('main') -%} {{ get_create_table_as_sql(False, backup_relation, sql) }} {%- endcall %} {% do exchange_tables_atomic(backup_relation, existing_relation) %} {% call statement('create new materialized view') %} - {{ clickhouse__create_mv_sql(mv_name, existing_relation.name, cluster_clause, sql) }} + {{ clickhouse__create_mv_sql(mv_relation, existing_relation.name, cluster_clause, sql) }} {% endcall %} {% else %} {{ log('Replacing existing materialized view' + target_relation.name) }} @@ -72,7 +71,7 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation, target_mv]}) }} + {{ return({'relations': [target_relation, mv_relation]}) }} {%- endmaterialization -%} @@ -88,13 +87,13 @@ {{ get_create_table_as_sql(False, relation, sql) }} {% endcall %} {%- set cluster_clause = on_cluster_clause(relation) -%} - {%- set mv_name = relation.name + '_mv' -%} - {{ clickhouse__create_mv_sql(mv_name, relation.name, cluster_clause, sql) }} + {%- set mv_relation = relation.derivative('_mv', 'MaterializedView') -%} + {{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }} {%- endmacro %} -{% macro clickhouse__create_mv_sql(relation_name, target_table, cluster_clause, sql) -%} - create materialized view if not exists {{ relation_name }} {{ cluster_clause }} +{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%} + create materialized view if not exists {{ mv_relation }} {{ cluster_clause }} to {{ target_table }} as {{ sql }} {%- endmacro %} @@ -103,9 +102,9 @@ {% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %} {# drop existing materialized view while we recreate the target table #} {%- set cluster_clause = on_cluster_clause(target_relation) -%} - {%- set mv_name = target_relation.name + '_mv' -%} + {%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%} {% call statement('drop existing mv') -%} - drop view if exists {{ mv_name }} {{ cluster_clause }} + drop view if exists {{ mv_relation }} {{ cluster_clause }} {%- endcall %} {# recreate the target table #} @@ -116,5 +115,5 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {# now that the target table is recreated, we can finally create our new view #} - {{ clickhouse__create_mv_sql(mv_name, target_relation.name, cluster_clause, sql) }} + {{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }} {% endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/snapshot.sql b/dbt/include/clickhouse/macros/materializations/snapshot.sql index 2a317736..71e5acc5 100644 --- a/dbt/include/clickhouse/macros/materializations/snapshot.sql +++ b/dbt/include/clickhouse/macros/materializations/snapshot.sql @@ -25,7 +25,7 @@ {%- set insert_cols_csv = insert_cols | join(', ') -%} {%- set valid_to_col = adapter.quote('dbt_valid_to') -%} - {%- set upsert = target ~ '__snapshot_upsert' -%} + {%- set upsert = target.derivative('__snapshot_upsert') -%} {% call statement('create_upsert_relation') %} create table if not exists {{ upsert }} as {{ target }} {% endcall %} diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index 22537f5c..72cc72c8 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -141,7 +141,7 @@ {% call statement('create_table_empty') %} {{ create_table }} {% endcall %} - {{ clickhouse__insert_into(relation.include(database=False), sql, has_contract) }} + {{ clickhouse__insert_into(relation, sql, has_contract) }} {%- endif %} {%- endmacro %} @@ -151,7 +151,7 @@ {{ sql_header if sql_header is not none }} {% if temporary -%} - create temporary table {{ relation.name }} + create temporary table {{ relation }} engine Memory {{ order_cols(label="order by") }} {{ partition_cols(label="partition by") }} @@ -160,7 +160,7 @@ {{ sql }} ) {%- else %} - create table {{ relation.include(database=False) }} + create table {{ relation }} {{ on_cluster_clause(relation)}} {%- if has_contract%} {{ get_assert_columns_equivalent(sql) }} diff --git a/tests/integration/adapter/aliases/test_aliases.py b/tests/integration/adapter/aliases/test_aliases.py index 30575aa8..a9a3d585 100644 --- a/tests/integration/adapter/aliases/test_aliases.py +++ b/tests/integration/adapter/aliases/test_aliases.py @@ -83,17 +83,17 @@ def test_alias_model_name(self, project): assert len(results) == 4 cluster = project.test_config['cluster'] - relation = relation_from_name(project.adapter, "foo") + local_relation = relation_from_name(project.adapter, "foo_local") result = project.run_sql( - f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ", + f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ", fetch="one", ) assert result[0] == "foo" - relation = relation_from_name(project.adapter, "ref_foo_alias") + local_relation = relation_from_name(project.adapter, "ref_foo_alias_local") result = project.run_sql( - f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {relation}_local) ", + f"select max(tablename) AS tablename From clusterAllReplicas('{cluster}', {local_relation}) ", fetch="one", ) assert result[0] == "ref_foo_alias" diff --git a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py index c7f20e00..ff6e2efb 100644 --- a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py +++ b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py @@ -77,23 +77,23 @@ def seeds(self): } def assert_total_count_correct(self, project): - '''Check if data is properly distributed''' + # Check if data is properly distributed cluster = project.test_config['cluster'] - table_relation = relation_from_name(project.adapter, "distributed") + table_relation = relation_from_name(project.adapter, "distributed_local") cluster_info = project.run_sql( f"select shard_num,max(host_name) as host_name, count(distinct replica_num) as replica_counts " f"from system.clusters where cluster='{cluster}' group by shard_num", fetch="all", ) sum_count = project.run_sql( - f"select count() From clusterAllReplicas('{cluster}',{table_relation}_local)", + f"select count() From clusterAllReplicas('{cluster}',{table_relation})", fetch="one", ) total_count = 0 # total count should be equal to sum(count of each shard * replica_counts) for shard_num, host_name, replica_counts in cluster_info: count = project.run_sql( - f"select count() From remote('{host_name}',{table_relation}_local)", + f"select count() From remote('{host_name}',{table_relation})", fetch="one", ) total_count += count[0] * replica_counts @@ -103,7 +103,7 @@ def assert_total_count_correct(self, project): os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' ) def test_base(self, project): - # cluster setting must exists + # cluster setting must exist cluster = project.test_config['cluster'] assert cluster diff --git a/tests/integration/adapter/materialized_view/test_materialized_view.py b/tests/integration/adapter/materialized_view/test_materialized_view.py index b5efb018..9305d064 100644 --- a/tests/integration/adapter/materialized_view/test_materialized_view.py +++ b/tests/integration/adapter/materialized_view/test_materialized_view.py @@ -8,6 +8,8 @@ import pytest from dbt.tests.util import check_relation_types, run_dbt +from dbt.adapters.clickhouse.query import quote_identifier + PEOPLE_SEED_CSV = """ id,name,age,department 1231,Dade,33,engineering @@ -116,7 +118,7 @@ def test_create(self, project): # insert some data and make sure it reaches the target table project.run_sql( f""" - insert into {project.test_schema}.people ("id", "name", "age", "department") + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware'); """ ) @@ -153,7 +155,7 @@ def test_update(self, project): project.run_sql( f""" - insert into {project.test_schema}.people ("id", "name", "age", "department") + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); """ ) diff --git a/tests/unit/test_adapter.py b/tests/unit/test_util.py similarity index 89% rename from tests/unit/test_adapter.py rename to tests/unit/test_util.py index 0faf9dbe..d87d2e57 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_util.py @@ -1,4 +1,4 @@ -from dbt.adapters.clickhouse.impl import compare_versions +from dbt.adapters.clickhouse.util import compare_versions def test_is_before_version():