Skip to content

Commit

Permalink
feat(incremental): distrubted append
Browse files Browse the repository at this point in the history
  • Loading branch information
Savid committed Jan 17, 2024
1 parent 6125272 commit 665f184
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,40 @@
{% if existing_relation is none %}
-- No existing table, simply create a new one
{% call statement('main') %}
{{ get_create_table_as_sql(False, target_relation, sql) }}
{{ create_empty_table(target_relation, sql) }}
{% endcall %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}

{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}

{% elif full_refresh_mode %}
-- Completely replacing the old table, so create a temporary table and then swap it
{% call statement('main') %}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{% endcall %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}
{% set need_swap = true %}

{% elif inserts_only or unique_key is none -%}
-- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing
-- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter
-- specific configurable that is used to avoid creating an expensive intermediate table.
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}

{% else %}
{% set column_changes = none %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}
{% set schema_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
Expand All @@ -77,7 +91,7 @@
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
{% elif incremental_strategy == 'append' %}
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{{ clickhouse__insert_into(target_relation, sql, model['name']) }}
{% endcall %}
{% endif %}
{% endif %}
Expand All @@ -93,10 +107,6 @@
{% do to_drop.append(backup_relation) %}
{% endif %}

{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

Expand Down Expand Up @@ -256,17 +266,31 @@
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
=======
{%- endif %}
SETTINGS mutations_sync = 2, allow_nondeterministic_mutations = 1
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
=======
insert into {{ model['name'] }} select {{ dest_cols_csv}} from {{ new_data_relation }} SETTINGS mutations_sync = 2, insert_distributed_sync = 1
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{% call statement('optimize_table') %}
optimize table {{ existing_relation }} {{ on_cluster_clause() }} FINAL DEDUPLICATE
{% endcall %}
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endmacro %}

{% macro clickhouse__incremental_create_distributed(relation) %}
Expand All @@ -283,21 +307,25 @@
{% endif %}
)
{% endcall %}
{% else %}
{% call statement('create_temp_distributed_table') %}
CREATE TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }} AS {{ relation }}
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ relation.name }}'
{% if sharding is not none %}
, {{ sharding }}
{% endif %}
)
{% endcall %}
{% call statement('exchange_distributed_table') %}
EXCHANGE TABLES {{ model['name'] }}_dbt_temp AND {{ model['name'] }} {{ on_cluster_clause() }}
{% endcall %}
{% call statement('drop_temp_distributed_table') %}
DROP TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }}
{% endcall %}
{% endif %}

{% endmacro %}


{% macro create_empty_table(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create table {{ relation.include(database=False) }}
{{ on_cluster_clause() }}
{{ engine_clause() }}
{{ order_cols(label="order by") }}
{{ primary_key_clause(label="primary key") }}
{{ partition_cols(label="partition by") }}
{{ adapter.get_model_settings(model) }}
empty
as (
{{ sql }}
)
{%- endmacro %}
3 changes: 2 additions & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,10 @@

{%- endmacro %}

{% macro clickhouse__insert_into(target_relation, sql, has_contract) %}
{% macro clickhouse__insert_into(target_relation, sql, override_name) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set target_name = override_name or target_relation.name -%}

insert into {{ target_relation }}
({{ dest_cols_csv }})
Expand Down

0 comments on commit 665f184

Please sign in to comment.