Skip to content

Commit

Permalink
update base on review
Browse files Browse the repository at this point in the history
  • Loading branch information
gfunc committed Oct 12, 2023
1 parent 1533862 commit 4e59222
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 13 deletions.
27 changes: 23 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ your_profile_name:
port: [8123] # If not set, defaults to 8123, 8443, 9000, 9440 depending on the secure and driver settings
user: [default] # User for all database operations
password: [<empty string>] # Password for the user
cluster: [<empty string>] If set, DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster
cluster: [<empty string>] If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. Distributed materializations require this setting to work. See the following ClickHouse Cluster section for more details.
verify: [True] # Validate TLS certificate if using TLS/SSL
secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error)
Expand All @@ -75,7 +75,7 @@ your_profile_name:
cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud)
use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy.
check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions)
local_suffix [local] # Table suffix of local tables on shards for distributed materializations
local_suffix [_local] # Table suffix of local tables on shards for distributed materializations.
custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
# Native (clickhouse-driver) connection settings
Expand All @@ -97,9 +97,29 @@ your_profile_name:
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
## ClickHouse Cluster

`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster.

### Effective Scope


if `cluster` is set in profile, `on_cluster_clause` now will return cluster info for:
- Database creation
- View materialization
- Distributed materializations
- Models with Replicated engines

table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would be created on the connected node only).

### Compatibility


If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML without `on cluster` clause for this model.


## Known Limitations

* Replicated tables (combined with the `cluster` profile setting) are available using the `on_cluster_clause` macro but are not included in the test suite and not formally tested.
* Ephemeral models/CTEs don't work if placed before the "INSERT INTO" in a ClickHouse insert statement, see https://github.com/ClickHouse/ClickHouse/issues/30323. This
should not affect most models, but care should be taken where an ephemeral model is placed in model definitions and other SQL statements.

Expand Down Expand Up @@ -169,7 +189,6 @@ See the [S3 test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/te

Notes:

- Distributed materializations are experimental and are not currently included in the automated test suite.
- dbt-clickhouse queries now automatically include the setting `insert_distributed_sync = 1` in order to ensure that downstream incremental
materialization operations execute correctly. This could cause some distributed table inserts to run more slowly than expected.

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def should_on_cluster(self) -> bool:
def get_on_cluster(
cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = ''
) -> bool:
if cluster:
if cluster.strip():
return 'view' == materialized or 'distributed' in materialized or 'Replicated' in engine
else:
return False
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
if(engine not in ('MaterializedView', 'View'), 'table', 'view') as type,
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(DISTINCT _shard_num) > 1 as is_on_cluster
count(distinct _shard_num) > 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
group by name, schema, type, db_engine
{%- else -%}
0 as is_on_cluster
from system.tables as t JOIN system.databases as db on t.database = db.name
from system.tables as t join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
{% endif %}

Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/adapters/apply_grants.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro clickhouse__get_show_grant_sql(relation) %}
SELECT access_type as privilege_type, COALESCE(user_name, role_name) as grantee FROM system.grants WHERE table = '{{ relation.name }}'
SELECT access_type as privilege_type, COALESCE(user_name, role_name) as grantee from system.grants where table = '{{ relation.name }}'
AND database = '{{ relation.schema }}'
{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
{%- set cluster = cluster[1:-1] -%}
{%- set sharding = config.get('sharding_key') -%}

CREATE TABLE {{ relation }} {{ on_cluster_clause(relation) }} AS {{ local_relation }}
create table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }}
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}'
{%- if sharding is not none and sharding.strip() != '' -%}
, {{ sharding }}
Expand Down
16 changes: 12 additions & 4 deletions tests/integration/adapter/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def test_seed(self, project):


class TestDistributedMaterializations(BaseSimpleMaterializations):
'''Test distributed materializations and check if data is properly distributed/replicated'''

@pytest.fixture(scope="class")
def models(self):
config_distributed_table = """
Expand All @@ -203,11 +205,12 @@ def seeds(self):
}

def assert_total_count_correct(self, project):
'''Check if data is properly distributed'''
cluster = project.test_config['cluster']
# check if data is properly distributed/replicated
table_relation = relation_from_name(project.adapter, "distributed")
cluster_info = project.run_sql(
f"select shard_num,max(host_name) as host_name,count(distinct replica_num) as replica_counts from system.clusters where cluster='{cluster}' group by shard_num",
f"select shard_num,max(host_name) as host_name, count(distinct replica_num) as replica_counts "
f"from system.clusters where cluster='{cluster}' group by shard_num",
fetch="all",
)
sum_count = project.run_sql(
Expand Down Expand Up @@ -285,6 +288,8 @@ def test_no_cluster_setting(self, project):


class TestReplicatedTableMaterialization(BaseSimpleMaterializations):
'''Test ReplicatedMergeTree table with table materialization'''

@pytest.fixture(scope="class")
def models(self):
config_replicated_table = """
Expand All @@ -302,18 +307,21 @@ def models(self):
}

def assert_total_count_correct(self, project):
'''Check if table is created on cluster and data is properly replicated'''
cluster = project.test_config['cluster']
# check if data is properly distributed/replicated
table_relation = relation_from_name(project.adapter, "replicated")
# ClickHouse cluster in the docker-compose file under tests/integration is configured with 3 nodes
# ClickHouse cluster in the docker-compose file
# under tests/integration is configured with 3 nodes
host_count = project.run_sql(
f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'",
fetch="one",
)
assert host_count[0] == 3

table_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}', system.tables) where database='{table_relation.schema}' and name='{table_relation.identifier}'",
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
f"where database='{table_relation.schema}' and name='{table_relation.identifier}'",
fetch="one",
)
assert table_count[0] == host_count[0]
Expand Down

0 comments on commit 4e59222

Please sign in to comment.