From e6e74e494ae09f6ff56ff106f166933e2114bb34 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Wed, 6 Dec 2023 13:39:46 -0700 Subject: [PATCH] Release 1 6 2 (#219) * Limited fix to completely broken `on_schema_change` * Tweak changelog --- CHANGELOG.md | 12 ++++ dbt/adapters/clickhouse/__version__.py | 2 +- dbt/adapters/clickhouse/errors.py | 24 +++++++ dbt/adapters/clickhouse/impl.py | 40 ++++++++++- dbt/adapters/clickhouse/util.py | 8 +++ .../incremental/incremental.sql | 68 ++++++++---------- .../adapter/basic/test_incremental.py | 2 +- .../adapter/incremental/test_schema_change.py | 71 +++++++++++++++++++ 8 files changed, 185 insertions(+), 42 deletions(-) create mode 100644 dbt/adapters/clickhouse/errors.py create mode 100644 tests/integration/adapter/incremental/test_schema_change.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1db44176..ec8ddc32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +### Release [1.6.2], 2023-12-06 +#### Bug Fix +- The dbt `on_schema_change` configuration value for incremental models was effectively being ignored. This has been fixed +with a very limited implementation. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/199. Because of the way that +ORDER BY/SORT BY/PARTITION BY/PRIMARY KEYS work in ClickHouse, plus the complexities of correctly transforming ClickHouse data types, +`sync_all_columns` is not currently supported (although an implementation that works for non-key columns is theoretically possible, +such an enhancement is not currently planned). Accordingly, only `ignore`, `fail`, and `append_new_columns` values are supported +for `on_schema_change`. It is also not currently supported for Distributed tables. + +Note that actually appending new columns requires a fallback to the `legacy` incremental strategy, which is quite inefficient, +so while theoretically possible, using `append_new_columns` is not recommended except for very small data volumes. + ### Release [1.6.1], 2023-12-04 #### Bug Fixes - Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index 43239b87..5ccf9d1c 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.6.1' +version = '1.6.2' diff --git a/dbt/adapters/clickhouse/errors.py b/dbt/adapters/clickhouse/errors.py new file mode 100644 index 00000000..1d3b5c69 --- /dev/null +++ b/dbt/adapters/clickhouse/errors.py @@ -0,0 +1,24 @@ +schema_change_fail_error = """ +The source and target schemas on this incremental model are out of sync. + They can be reconciled in several ways: + - set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`) + - Re-run the incremental model with `full_refresh: True` to update the target schema. + - update the schema manually and re-run the process. + + Additional troubleshooting context: + Source columns not in target: {0} + Target columns not in source: {1} + New column types: {2} +""" + +schema_change_datatype_error = """ +The source and target schemas on this incremental model contain different data types. This is not supported. + + Changed column types: {0} +""" + +schema_change_missing_source_error = """ +The target schema in on this incremental model contains a column not in the source schema. This is not supported. + + Source columns not in target: {0} +""" diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index bd20fb03..ca0c3a44 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -20,10 +20,15 @@ from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache from dbt.adapters.clickhouse.column import ClickHouseColumn from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager +from dbt.adapters.clickhouse.errors import ( + schema_change_datatype_error, + schema_change_fail_error, + schema_change_missing_source_error, +) from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier from dbt.adapters.clickhouse.relation import ClickHouseRelation -from dbt.adapters.clickhouse.util import compare_versions +from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions GET_CATALOG_MACRO_NAME = 'get_catalog' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' @@ -151,6 +156,39 @@ def calculate_incremental_strategy(self, strategy: str) -> str: strategy = 'legacy' return strategy + @available.parse_none + def check_incremental_schema_changes( + self, on_schema_change, existing, target_sql + ) -> List[ClickHouseColumn]: + if on_schema_change not in ('fail', 'ignore', 'append_new_columns'): + raise DbtRuntimeError( + "Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`" + ) + source = self.get_columns_in_relation(existing) + source_map = {column.name: column for column in source} + target = self.get_column_schema_from_query(target_sql) + target_map = {column.name: column for column in source} + source_not_in_target = [column for column in source if column.name not in target_map.keys()] + target_not_in_source = [column for column in target if column.name not in source_map.keys()] + new_column_data_types = [] + for target_column in target: + source_column = source_map.get(target_column.name) + if source_column and source_column.dtype != target_column.dtype: + new_column_data_types.append( + NewColumnDataType(source_column.name, target_column.dtype) + ) + if new_column_data_types: + raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types)) + if source_not_in_target: + raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target)) + if target_not_in_source and on_schema_change == 'fail': + raise DbtRuntimeError( + schema_change_fail_error.format( + source_not_in_target, target_not_in_source, new_column_data_types + ) + ) + return target_not_in_source + @available.parse_none def s3source_clause( self, diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py index bfe7d239..7114dbde 100644 --- a/dbt/adapters/clickhouse/util.py +++ b/dbt/adapters/clickhouse/util.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass + from dbt.exceptions import DbtRuntimeError @@ -11,3 +13,9 @@ def compare_versions(v1: str, v2: str) -> int: except ValueError: raise DbtRuntimeError("Version must consist of only numbers separated by '.'") return 0 + + +@dataclass +class NewColumnDataType: + column_name: str + new_type: str diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 7ab105d7..742642d2 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -50,21 +50,23 @@ {% endcall %} {% else %} - {% set schema_changes = none %} + {% set column_changes = none %} {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% if on_schema_change != 'ignore' %} - {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%} - {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {% do log('Schema changes detected, switching to legacy incremental strategy') %} + {%- if on_schema_change != 'ignore' %} + {%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%} + {%- if column_changes %} + {%- if incremental_strategy in ('append', 'delete_insert') %} + {% set incremental_strategy = 'legacy' %} + {{ log('Schema changes detected, switching to legacy incremental strategy') }} + {%- endif %} {% endif %} {% endif %} {% if incremental_strategy != 'delete_insert' and incremental_predicates %} {% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %} {% endif %} {% if incremental_strategy == 'legacy' %} - {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %} + {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %} {% set need_swap = true %} {% elif incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %} @@ -109,32 +111,7 @@ {%- endmaterialization %} - -{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %} - - {%- set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) -%} - {% if not schema_changes_dict['schema_changed'] %} - {{ return }} - {% endif %} - - {% if on_schema_change == 'fail' %} - {% set fail_msg %} - The source and target schemas on this incremental model are out of sync! - They can be reconciled in several ways: - - set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation. - - Re-run the incremental model with `full_refresh: True` to update the target schema. - - update the schema manually and re-run the process. - {% endset %} - {% do exceptions.raise_compiler_error(fail_msg) %} - {{ return }} - {% endif %} - - {% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} - -{% endmacro %} - - -{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %} +{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key, is_distributed=False) %} {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %} {{ drop_relation_if_exists(new_data_relation) }} @@ -143,10 +120,17 @@ -- First create a temporary table for all of the new data {% if is_distributed %} + {% if column_changes %} + {% do exceptions.raise_compiler_error('Schema changes not supported with Distributed tables ') %} + {% endif %} -- Need to use distributed table to have data on all shards {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} {%- set inserting_relation = distributed_new_data_relation -%} {{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }} + {% elif column_changes %} + {% call statement('create_new_data_temp') %} + {{ get_create_table_as_sql(False, new_data_relation, sql) }} + {% endcall %} {% else %} {% call statement('create_new_data_temp') %} {{ get_create_table_as_sql(False, new_data_relation, sql) }} @@ -168,11 +152,11 @@ -- Insert all the existing rows into the new temporary table, ignoring any rows that have keys in the "new data" -- table. - {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + {%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%} + {%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_existing_data') %} - insert into {{ inserted_relation }} ({{ dest_cols_csv }}) - select {{ dest_cols_csv }} + insert into {{ inserted_relation }} ({{ source_columns_csv }}) + select {{ source_columns_csv }} from {{ existing_relation }} where ({{ unique_key }}) not in ( select {{ unique_key }} @@ -182,9 +166,15 @@ {% endcall %} -- Insert all of the new data into the temporary table + {% if column_changes %} + {%- set dest_columns = adapter.get_columns_in_relation(new_data_relation) -%} + {%- set dest_columns_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + {% else %} + {%- set dest_columns_csv = source_columns_csv %} + {% endif %} {% call statement('insert_new_data') %} - insert into {{ inserted_relation }} ({{ dest_cols_csv }}) - select {{ dest_cols_csv }} + insert into {{ inserted_relation }} ({{ dest_columns_csv }}) + select {{ dest_columns_csv }} from {{ inserting_relation }} {{ adapter.get_model_query_settings(model) }} {% endcall %} diff --git a/tests/integration/adapter/basic/test_incremental.py b/tests/integration/adapter/basic/test_incremental.py index 3cc4cce9..c50d477a 100644 --- a/tests/integration/adapter/basic/test_incremental.py +++ b/tests/integration/adapter/basic/test_incremental.py @@ -7,7 +7,7 @@ class TestIncremental(BaseIncremental): incremental_not_schema_change_sql = """ -{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }} +{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="append_new_columns") }} select toString(1) || '-' || toString(now64()) as user_id_current_time, {% if is_incremental() %} diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py new file mode 100644 index 00000000..9bccaf4e --- /dev/null +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -0,0 +1,71 @@ +import pytest +from dbt.tests.util import run_dbt, run_dbt_and_capture + +schema_change_sql = """ +{{ + config( + materialized='incremental', + unique_key='col_1', + on_schema_change='%schema_change%' + ) +}} + +{% if not is_incremental() %} +select + number as col_1, + number + 1 as col_2 +from numbers(3) +{% else %} +select + number as col_1, + number + 1 as col_2, + number + 2 as col_3 +from numbers(2, 3) +{% endif %} +""" + + +class TestOnSchemaChange: + @pytest.fixture(scope="class") + def models(self): + return { + "schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"), + "schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"), + "schema_change_append.sql": schema_change_sql.replace( + "%schema_change%", "append_new_columns" + ), + } + + def test_ignore(self, project): + run_dbt(["run", "--select", "schema_change_ignore"]) + result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + run_dbt(["run", "--select", "schema_change_ignore"]) + result = project.run_sql("select * from schema_change_ignore", fetch="all") + assert len(result) == 5 + + def test_fail(self, project): + run_dbt(["run", "--select", "schema_change_fail"]) + result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + _, log_output = run_dbt_and_capture( + [ + "run", + "--select", + "schema_change_fail", + ], + expect_pass=False, + ) + assert 'out of sync' in log_output.lower() + + def test_append(self, project): + run_dbt(["run", "--select", "schema_change_append"]) + result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + run_dbt(["--debug", "run", "--select", "schema_change_append"]) + result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + assert result[0][2] == 0 + assert result[3][2] == 5