Skip to content

Commit

Permalink
Bug fixes related to model settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Nov 28, 2023
1 parent 3fec9a4 commit 57fd28d
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 38 deletions.
21 changes: 19 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
### Release [1.5.2], 2023-11-28
#### Bug Fix
#### Bug Fixes
- The `ON CLUSTER` clause was in the incorrect place for legacy incremental materializations. This has been fixed. Thanks to
[Steven Reitsma](https://github.com/StevenReitsma) for the fix!
- The `ON CLUSTER` DDL for drop tables did not include a SYNC modifier, which might be the cause of some "table already exists"
errors
errors. The `SYNC` modifier has been added to the `on_cluster` macro when dropping relations.
- Fixed a bug where using table settings such as `allow_nullable_key` would break "legacy" incremental materializations. Closes
https://github.com/ClickHouse/dbt-clickhouse/issues/209. Also see the new model `config` property `insert_settings` described
below.
- Fixed an issue where incremental materializations would incorrectly exclude duplicated inserted elements due to "automatic"
ClickHouse deduplication on replicated tables. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/213. The fix consists
of always sending a `replicated_deduplication_window=0` table setting when creating the incremental relations. This
behavior can be overridden by setting the new profile parameter `allow_automatic_deduplication` to `True`, although for
general dbt operations this is probably not necessary and not recommended. Finally thanks to Andy(https://github.com/andy-miracl)
for the report and debugging help!

#### Improvements
- Added a new profile property `allow_automatic_deduplication`, which defaults to `False`. ClickHouse Replicated deduplication is
now disable for incremental inserts, but this property can be set to true if for some reason the default ClickHouse behavior
for inserted blocks is desired.
- Added a new model `config` property `query_settings` for any ClickHouse settings that should be sent with the `INSERT INTO`
or `DELETE_FROM` queries used with materializations. Note this is distinct from the existing property `settings` which is
used for ClickHouse "table" settings in DDL statements like `CREATE TABLE ... AS`.

### Release [1.5.1], 2023-11-27
#### Bug Fix
Expand Down
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ your_profile_name:
use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy.
check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions)
local_suffix [_local] # Table suffix of local tables on shards for distributed materializations.
allow_automatic_deduplication [False] # Enable ClickHouse automatic deduplication for Replicated tables
custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
# Native (clickhouse-driver) connection settings
Expand All @@ -87,17 +88,27 @@ your_profile_name:

## Model Configuration

| Option | Description | Required? |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
| Option | Description | Default if any |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
| engine | The table engine (type of table) to use when creating tables | `MergeTree()` |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | `default` |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |

## A Note on Model Settings
ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are configurable. `settings` means the `SETTINGS`
clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the specific ClickHouse table engine. The new
`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization (including incremental materializations).
There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" setting (although the latter are generally
available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties should be carefully researched and tested.

## ClickHouse Cluster

`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster.
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ClickHouseCredentials(Credentials):
custom_settings: Optional[Dict[str, Any]] = None
use_lw_deletes: bool = False
local_suffix: str = 'local'
allow_automatic_deduplication = False

@property
def type(self):
Expand Down Expand Up @@ -73,4 +74,5 @@ def _connection_keys(self):
'check_exchange',
'custom_settings',
'use_lw_deletes',
'allow_automatic_deduplication',
)
10 changes: 10 additions & 0 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from abc import ABC, abstractmethod
from typing import Dict

from dbt.exceptions import DbtDatabaseError, FailedToConnectError

Expand All @@ -8,6 +9,7 @@

LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
DEDUP_WINDOW_SETTING = 'replicated_deduplication_window'


def get_db_client(credentials: ClickHouseCredentials):
Expand Down Expand Up @@ -79,6 +81,9 @@ def __init__(self, credentials: ClickHouseCredentials):
except Exception as ex:
self.close()
raise ex
self._model_settings = {}
if not credentials.allow_automatic_deduplication:
self._model_settings[DEDUP_WINDOW_SETTING] = '0'

@abstractmethod
def query(self, sql: str, **kwargs):
Expand Down Expand Up @@ -115,6 +120,11 @@ def _set_client_database(self):
def _server_version(self):
pass

def update_model_settings(self, model_settings: Dict[str, str]):
for key, value in self._model_settings.items():
if key not in model_settings:
model_settings[key] = value

def _check_lightweight_deletes(self, requested: bool):
lw_deletes = self.get_ch_setting(LW_DELETE_SETTING)
nd_mutations = self.get_ch_setting(ND_MUTATION_SETTING)
Expand Down
12 changes: 11 additions & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,17 @@ def run_sql_for_tests(self, sql, fetch, conn):

@available
def get_model_settings(self, model):
settings = model['config'].get('settings', dict())
settings = model['config'].get('settings', {})
conn = self.connections.get_if_exists()
conn.handle.update_model_settings(settings)
res = []
for key in settings:
res.append(f' {key}={settings[key]}')
return '' if len(res) == 0 else 'SETTINGS ' + ', '.join(res) + '\n'

@available
def get_model_query_settings(self, model):
settings = model['config'].get('query_settings', {})
res = []
for key in settings:
res.append(f' {key}={settings[key]}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@
select {{ unique_key }}
from {{ inserting_relation }}
)
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

-- Insert all of the new data into the temporary table
{% call statement('insert_new_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from {{ inserting_relation }}
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{% do adapter.drop_relation(new_data_relation) %}
Expand Down Expand Up @@ -228,13 +228,14 @@
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }}
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

{% set sql -%}
insert into {{ this.render() }} ({{ cols_sql }})
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
format CSV
{{ data_sql }}
{%- endset %}
Expand Down
4 changes: 3 additions & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
insert into {{ target_relation }}
({{ dest_cols_csv }})
{%- if has_contract -%}
-- Use a subquery to get columns in the right order
SELECT {{ dest_cols_csv }} FROM ( {{ sql }} )
{%- else -%}
{{ sql }}
{{ adapter.get_model_query_settings(model) }}
{%- endif -%}
{%- endmacro %}
2 changes: 2 additions & 0 deletions tests/integration/adapter/basic/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
column_types:
val2: Nullable(UInt32)
str1: Nullable(String)
settings:
allow_nullable_key: 1
"""

replicated_seeds_schema_yml = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ class TestMergeTreeTableMaterialization(BaseSimpleMaterializations):
@pytest.fixture(scope="class")
def models(self):
config_materialized_table = """
{{ config(order_by='(some_date, id, name)', engine='MergeTree()', materialized='table',
settings={'allow_nullable_key': 1}) }}
{{ config(
order_by='(some_date, id, name)',
engine='MergeTree()',
materialized='table',
settings={'allow_nullable_key': 1},
query_settings={'allow_nondeterministic_mutations': 1})
}}
"""
base_table_sql = config_materialized_table + model_base
return {
Expand Down Expand Up @@ -204,7 +209,7 @@ def assert_total_count_correct(self, project):
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exists
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/adapter/constraints/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test__contract_wrong_column_names(self, project):
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])

def test__contract_wrong_column_data_types(self, project, data_types):
for (sql_column_value, schema_data_type, error_data_type) in data_types:
for sql_column_value, schema_data_type, error_data_type in data_types:
# Write parametrized data_type to sql file
write_file(
my_model_data_type_sql.format(sql_value=sql_column_value),
Expand Down Expand Up @@ -91,7 +91,7 @@ def test__contract_wrong_column_data_types(self, project, data_types):
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])

def test__contract_correct_column_data_types(self, project, data_types):
for (sql_column_value, schema_data_type, _) in data_types:
for sql_column_value, schema_data_type, _ in data_types:
# Write parametrized data_type to sql file
write_file(
my_model_data_type_sql.format(sql_value=sql_column_value),
Expand Down
Loading

0 comments on commit 57fd28d

Please sign in to comment.