Skip to content

Commit

Permalink
feat(incremental): distrubted append
Browse files Browse the repository at this point in the history
  • Loading branch information
Savid committed Jan 17, 2024
1 parent 0255cec commit cf5369c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@

{% else %}
{% set column_changes = none %}
{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}
{% set schema_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
Expand Down Expand Up @@ -103,10 +107,6 @@
{% do to_drop.append(backup_relation) %}
{% endif %}

{% if config.get('distributed') %}
{% do clickhouse__incremental_create_distributed(target_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

Expand Down Expand Up @@ -266,17 +266,31 @@
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
=======
{%- endif %}
SETTINGS mutations_sync = 2, allow_nondeterministic_mutations = 1
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
<<<<<<< HEAD:dbt/include/clickhouse/macros/materializations/incremental/incremental.sql
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
=======
insert into {{ model['name'] }} select {{ dest_cols_csv}} from {{ new_data_relation }} SETTINGS mutations_sync = 2, insert_distributed_sync = 1
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{% call statement('optimize_table') %}
optimize table {{ existing_relation }} {{ on_cluster_clause() }} FINAL DEDUPLICATE
{% endcall %}
>>>>>>> bd0556f (feat(incremental): distrubted append):dbt/include/clickhouse/macros/materializations/incremental.sql
{% endmacro %}

{% macro clickhouse__incremental_create_distributed(relation) %}
Expand All @@ -293,21 +307,25 @@
{% endif %}
)
{% endcall %}
{% else %}
{% call statement('create_temp_distributed_table') %}
CREATE TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }} AS {{ relation }}
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ relation.name }}'
{% if sharding is not none %}
, {{ sharding }}
{% endif %}
)
{% endcall %}
{% call statement('exchange_distributed_table') %}
EXCHANGE TABLES {{ model['name'] }}_dbt_temp AND {{ model['name'] }} {{ on_cluster_clause() }}
{% endcall %}
{% call statement('drop_temp_distributed_table') %}
DROP TABLE {{ model['name'] }}_dbt_temp {{ on_cluster_clause() }}
{% endcall %}
{% endif %}

{% endmacro %}


{% macro create_empty_table(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create table {{ relation.include(database=False) }}
{{ on_cluster_clause() }}
{{ engine_clause() }}
{{ order_cols(label="order by") }}
{{ primary_key_clause(label="primary key") }}
{{ partition_cols(label="partition by") }}
{{ adapter.get_model_settings(model) }}
empty
as (
{{ sql }}
)
{%- endmacro %}
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@

{%- endmacro %}

{% macro clickhouse__insert_into(target_relation, sql, has_contract) %}
{% macro clickhouse__insert_into(target_relation, sql, override_name) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set target_name = override_name or target_relation.name -%}
Expand Down

0 comments on commit cf5369c

Please sign in to comment.