From 4e592222e12fa40aab35241bccf7c51466662782 Mon Sep 17 00:00:00 2001 From: gfunc Date: Thu, 12 Oct 2023 07:47:41 +0000 Subject: [PATCH] update base on review --- README.md | 27 ++++++++++++++++--- dbt/adapters/clickhouse/relation.py | 2 +- dbt/include/clickhouse/macros/adapters.sql | 4 +-- .../macros/adapters/apply_grants.sql | 2 +- .../materializations/distributed_table.sql | 2 +- tests/integration/adapter/test_basic.py | 16 ++++++++--- 6 files changed, 40 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index e12fca39..083d38ac 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ your_profile_name: port: [8123] # If not set, defaults to 8123, 8443, 9000, 9440 depending on the secure and driver settings user: [default] # User for all database operations password: [] # Password for the user - cluster: [] If set, DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster + cluster: [] If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. Distributed materializations require this setting to work. See the following ClickHouse Cluster section for more details. verify: [True] # Validate TLS certificate if using TLS/SSL secure: [False] # Use TLS (native protocol) or HTTPS (http protocol) retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error) @@ -75,7 +75,7 @@ your_profile_name: cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud) use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy. check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions) - local_suffix [local] # Table suffix of local tables on shards for distributed materializations + local_suffix [_local] # Table suffix of local tables on shards for distributed materializations. custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty. # Native (clickhouse-driver) connection settings @@ -97,9 +97,29 @@ your_profile_name: | inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional | | incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) | | incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | +## ClickHouse Cluster + +`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster. + +### Effective Scope + + +if `cluster` is set in profile, `on_cluster_clause` now will return cluster info for: +- Database creation +- View materialization +- Distributed materializations +- Models with Replicated engines + +table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would be created on the connected node only). + +### Compatibility + + +If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML without `on cluster` clause for this model. + + ## Known Limitations -* Replicated tables (combined with the `cluster` profile setting) are available using the `on_cluster_clause` macro but are not included in the test suite and not formally tested. * Ephemeral models/CTEs don't work if placed before the "INSERT INTO" in a ClickHouse insert statement, see https://github.com/ClickHouse/ClickHouse/issues/30323. This should not affect most models, but care should be taken where an ephemeral model is placed in model definitions and other SQL statements. @@ -169,7 +189,6 @@ See the [S3 test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/te Notes: -- Distributed materializations are experimental and are not currently included in the automated test suite. - dbt-clickhouse queries now automatically include the setting `insert_distributed_sync = 1` in order to ensure that downstream incremental materialization operations execute correctly. This could cause some distributed table inserts to run more slowly than expected. diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index f655e02c..3fc91437 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -64,7 +64,7 @@ def should_on_cluster(self) -> bool: def get_on_cluster( cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = '' ) -> bool: - if cluster: + if cluster.strip(): return 'view' == materialized or 'distributed' in materialized or 'Replicated' in engine else: return False diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index 0326e5f3..8d52a2c1 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -38,14 +38,14 @@ if(engine not in ('MaterializedView', 'View'), 'table', 'view') as type, db.engine as db_engine, {%- if adapter.get_clickhouse_cluster_name() -%} - count(DISTINCT _shard_num) > 1 as is_on_cluster + count(distinct _shard_num) > 1 as is_on_cluster from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t join system.databases as db on t.database = db.name where schema = '{{ schema_relation.schema }}' group by name, schema, type, db_engine {%- else -%} 0 as is_on_cluster - from system.tables as t JOIN system.databases as db on t.database = db.name + from system.tables as t join system.databases as db on t.database = db.name where schema = '{{ schema_relation.schema }}' {% endif %} diff --git a/dbt/include/clickhouse/macros/adapters/apply_grants.sql b/dbt/include/clickhouse/macros/adapters/apply_grants.sql index 9f0a0520..387b333b 100644 --- a/dbt/include/clickhouse/macros/adapters/apply_grants.sql +++ b/dbt/include/clickhouse/macros/adapters/apply_grants.sql @@ -1,5 +1,5 @@ {% macro clickhouse__get_show_grant_sql(relation) %} - SELECT access_type as privilege_type, COALESCE(user_name, role_name) as grantee FROM system.grants WHERE table = '{{ relation.name }}' + SELECT access_type as privilege_type, COALESCE(user_name, role_name) as grantee from system.grants where table = '{{ relation.name }}' AND database = '{{ relation.schema }}' {%- endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index d9bda1c5..9f920ad9 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -85,7 +85,7 @@ {%- set cluster = cluster[1:-1] -%} {%- set sharding = config.get('sharding_key') -%} - CREATE TABLE {{ relation }} {{ on_cluster_clause(relation) }} AS {{ local_relation }} + create table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }} ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}' {%- if sharding is not none and sharding.strip() != '' -%} , {{ sharding }} diff --git a/tests/integration/adapter/test_basic.py b/tests/integration/adapter/test_basic.py index 69db05c4..645c1181 100644 --- a/tests/integration/adapter/test_basic.py +++ b/tests/integration/adapter/test_basic.py @@ -180,6 +180,8 @@ def test_seed(self, project): class TestDistributedMaterializations(BaseSimpleMaterializations): + '''Test distributed materializations and check if data is properly distributed/replicated''' + @pytest.fixture(scope="class") def models(self): config_distributed_table = """ @@ -203,11 +205,12 @@ def seeds(self): } def assert_total_count_correct(self, project): + '''Check if data is properly distributed''' cluster = project.test_config['cluster'] - # check if data is properly distributed/replicated table_relation = relation_from_name(project.adapter, "distributed") cluster_info = project.run_sql( - f"select shard_num,max(host_name) as host_name,count(distinct replica_num) as replica_counts from system.clusters where cluster='{cluster}' group by shard_num", + f"select shard_num,max(host_name) as host_name, count(distinct replica_num) as replica_counts " + f"from system.clusters where cluster='{cluster}' group by shard_num", fetch="all", ) sum_count = project.run_sql( @@ -285,6 +288,8 @@ def test_no_cluster_setting(self, project): class TestReplicatedTableMaterialization(BaseSimpleMaterializations): + '''Test ReplicatedMergeTree table with table materialization''' + @pytest.fixture(scope="class") def models(self): config_replicated_table = """ @@ -302,10 +307,12 @@ def models(self): } def assert_total_count_correct(self, project): + '''Check if table is created on cluster and data is properly replicated''' cluster = project.test_config['cluster'] # check if data is properly distributed/replicated table_relation = relation_from_name(project.adapter, "replicated") - # ClickHouse cluster in the docker-compose file under tests/integration is configured with 3 nodes + # ClickHouse cluster in the docker-compose file + # under tests/integration is configured with 3 nodes host_count = project.run_sql( f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'", fetch="one", @@ -313,7 +320,8 @@ def assert_total_count_correct(self, project): assert host_count[0] == 3 table_count = project.run_sql( - f"select count() From clusterAllReplicas('{cluster}', system.tables) where database='{table_relation.schema}' and name='{table_relation.identifier}'", + f"select count() From clusterAllReplicas('{cluster}', system.tables) " + f"where database='{table_relation.schema}' and name='{table_relation.identifier}'", fetch="one", ) assert table_count[0] == host_count[0]