Skip to content

Commit

Permalink
Limited fix to completely broken on_schema_change
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Dec 6, 2023
1 parent 08bbbf9 commit 7ac7fbe
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 42 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
### Release [1.6.2], 2023-12-06
#### Bug Fix
- The dbt `on_schema_change` configuration value for incremental models was effectively being ignored. This has been fixed
with a very limited implementation. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/199. Because of the way that
ORDER BY/SORT BY/PARTITION BY/PRIMARY KEYS work in ClickHouse, plus the complexities of correctly transforming ClickHouse data types,
`sync_all_columns` is not currently supported (although an implementation that works for non-key columns is theoretically possible,
such an enhancement is not currently planned). Accordingly, only `ignore`, `fail`, and `append_new_columns` values are supported
for `on_schema_change`. Note that actually appending new columns requires a fallback to the `legacy` incremental strategy, which is quite inefficient,
so while theoretically possible, using `append_new_columns` is not recommended except for very small data volumes.


### Release [1.6.1], 2023-12-04
#### Bug Fixes
- Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.6.1'
version = '1.6.2'
24 changes: 24 additions & 0 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
schema_change_fail_error = """
The source and target schemas on this incremental model are out of sync.
They can be reconciled in several ways:
- set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`)
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
Additional troubleshooting context:
Source columns not in target: {0}
Target columns not in source: {1}
New column types: {2}
"""

schema_change_datatype_error = """
The source and target schemas on this incremental model contain different data types. This is not supported.
Changed column types: {0}
"""

schema_change_missing_source_error = """
The target schema in on this incremental model contains a column not in the source schema. This is not supported.
Source columns not in target: {0}
"""
40 changes: 39 additions & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.clickhouse.errors import (
schema_change_datatype_error,
schema_change_fail_error,
schema_change_missing_source_error,
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation
from dbt.adapters.clickhouse.util import compare_versions
from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions

GET_CATALOG_MACRO_NAME = 'get_catalog'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
Expand Down Expand Up @@ -151,6 +156,39 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
strategy = 'legacy'
return strategy

@available.parse_none
def check_incremental_schema_changes(
self, on_schema_change, existing, target_sql
) -> List[ClickHouseColumn]:
if on_schema_change not in ('fail', 'ignore', 'append_new_columns'):
raise DbtRuntimeError(
"Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`"
)
source = self.get_columns_in_relation(existing)
source_map = {column.name: column for column in source}
target = self.get_column_schema_from_query(target_sql)
target_map = {column.name: column for column in source}
source_not_in_target = [column for column in source if column.name not in target_map.keys()]
target_not_in_source = [column for column in target if column.name not in source_map.keys()]
new_column_data_types = []
for target_column in target:
source_column = source_map.get(target_column.name)
if source_column and source_column.dtype != target_column.dtype:
new_column_data_types.append(
NewColumnDataType(source_column.name, target_column.dtype)
)
if new_column_data_types:
raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types))
if source_not_in_target:
raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target))
if target_not_in_source and on_schema_change == 'fail':
raise DbtRuntimeError(
schema_change_fail_error.format(
source_not_in_target, target_not_in_source, new_column_data_types
)
)
return target_not_in_source

@available.parse_none
def s3source_clause(
self,
Expand Down
8 changes: 8 additions & 0 deletions dbt/adapters/clickhouse/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import dataclass

from dbt.exceptions import DbtRuntimeError


Expand All @@ -11,3 +13,9 @@ def compare_versions(v1: str, v2: str) -> int:
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0


@dataclass
class NewColumnDataType:
column_name: str
new_type: str
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,23 @@
{% endcall %}

{% else %}
{% set schema_changes = none %}
{% set column_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% if on_schema_change != 'ignore' %}
{%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%}
{% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{% do log('Schema changes detected, switching to legacy incremental strategy') %}
{%- if on_schema_change != 'ignore' %}
{%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
{%- if column_changes %}
{%- if incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{{ log('Schema changes detected, switching to legacy incremental strategy') }}
{%- endif %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
Expand Down Expand Up @@ -109,32 +111,7 @@

{%- endmaterialization %}


{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{%- set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) -%}
{% if not schema_changes_dict['schema_changed'] %}
{{ return }}
{% endif %}

{% if on_schema_change == 'fail' %}
{% set fail_msg %}
The source and target schemas on this incremental model are out of sync!
They can be reconciled in several ways:
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
{% endset %}
{% do exceptions.raise_compiler_error(fail_msg) %}
{{ return }}
{% endif %}

{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{% endmacro %}


{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %}
{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %}
{{ drop_relation_if_exists(new_data_relation) }}

Expand All @@ -143,10 +120,17 @@

-- First create a temporary table for all of the new data
{% if is_distributed %}
{% if column_changes %}
{% do exceptions.raise_compiler_error('Schema changes not supported with Distributed tables ') %}
{% endif %}
-- Need to use distributed table to have data on all shards
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set inserting_relation = distributed_new_data_relation -%}
{{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
{% elif column_changes %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{% endcall %}
{% else %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
Expand All @@ -168,11 +152,11 @@

-- Insert all the existing rows into the new temporary table, ignoring any rows that have keys in the "new data"
-- table.
{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_existing_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ source_columns_csv }})
select {{ source_columns_csv }}
from {{ existing_relation }}
where ({{ unique_key }}) not in (
select {{ unique_key }}
Expand All @@ -182,9 +166,15 @@
{% endcall %}

-- Insert all of the new data into the temporary table
{% if column_changes %}
{%- set dest_columns = adapter.get_columns_in_relation(new_data_relation) -%}
{%- set dest_columns_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% else %}
{%- set dest_columns_csv = source_columns_csv %}
{% endif %}
{% call statement('insert_new_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ dest_columns_csv }})
select {{ dest_columns_csv }}
from {{ inserting_relation }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TestIncremental(BaseIncremental):


incremental_not_schema_change_sql = """
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="append_new_columns") }}
select
toString(1) || '-' || toString(now64()) as user_id_current_time,
{% if is_incremental() %}
Expand Down
71 changes: 71 additions & 0 deletions tests/integration/adapter/incremental/test_schema_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture

schema_change_sql = """
{{
config(
materialized='incremental',
unique_key='col_1',
on_schema_change='%schema_change%'
)
}}
{% if not is_incremental() %}
select
number as col_1,
number + 1 as col_2
from numbers(3)
{% else %}
select
number as col_1,
number + 1 as col_2,
number + 2 as col_3
from numbers(2, 3)
{% endif %}
"""


class TestOnSchemaChange:
@pytest.fixture(scope="class")
def models(self):
return {
"schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"),
"schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"),
"schema_change_append.sql": schema_change_sql.replace(
"%schema_change%", "append_new_columns"
),
}

def test_ignore(self, project):
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore", fetch="all")
assert len(result) == 5

def test_fail(self, project):
run_dbt(["run", "--select", "schema_change_fail"])
result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
_, log_output = run_dbt_and_capture(
[
"run",
"--select",
"schema_change_fail",
],
expect_pass=False,
)
assert 'out of sync' in log_output.lower()

def test_append(self, project):
run_dbt(["run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["--debug", "run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert result[0][2] == 0
assert result[3][2] == 5

0 comments on commit 7ac7fbe

Please sign in to comment.