diff --git a/macros/tables/databricks/pit.sql b/macros/tables/databricks/pit.sql index b09e4abb..535946c7 100644 --- a/macros/tables/databricks/pit.sql +++ b/macros/tables/databricks/pit.sql @@ -31,6 +31,23 @@ WITH as_of_dates AS ( SELECT * FROM {{ as_of_table_relation }} ), +{%- for sat_name in satellites -%} + {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list)[0] -%} + {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list)[0] -%} + {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} + {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%} + {%- set column_str = "{}.{}".format(sat_name | lower ~ '_src', sat_ldts) -%} + + {{ sat_name | lower ~ '_src' }} AS ( + SELECT + *, + LEAD({{ sat_ldts }},1,'9999-12-31 23:59:59.999999') OVER (PARTITION BY {{ sat_pk }} ORDER BY {{ sat_ldts }}) as next_{{ sat_ldts }} + FROM + {{ ref(sat_name) }} + ), + +{%- endfor %} + {%- if automate_dv.is_any_incremental() %} {{ automate_dv.as_of_date_window(src_pk, src_ldts, stage_tables_ldts, ref(source_model)) }}, @@ -59,21 +76,21 @@ backfill AS ( {% if enable_ghost_record %} - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}, {{ automate_dv.binary_ghost(none, hash) }}) AS {{ sat_name }}_{{ sat_pk_name }}, - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}, {{ automate_dv.date_ghost(date_type = sat_ldts.dtype, alias=none) }}) AS {{ sat_name }}_{{ sat_ldts_name }} {%- else %} - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}, {{ automate_dv.cast_binary(ghost_pk, quote=true) }}) AS {{ sat_name }}_{{ sat_pk_name }}, - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}, {{ automate_dv.cast_date(ghost_date, as_string=true, datetime=true) }}) AS {{ sat_name }}_{{ sat_ldts_name }} @@ -90,13 +107,11 @@ backfill AS ( {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %} - LEFT OUTER JOIN {{ ref(sat_name) }} AS {{ sat_name | lower ~ '_src' }} + LEFT OUTER JOIN {{ sat_name | lower ~ '_src' }} ON a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }} AND {{ sat_name | lower ~ '_src'}}.{{ sat_ldts }} <= a.AS_OF_DATE + AND {{ sat_name | lower ~ '_src'}}.next_{{ sat_ldts }} > a.AS_OF_DATE {% endfor %} - - GROUP BY - {{ automate_dv.prefix([src_pk], 'a') }}, a.AS_OF_DATE ), {%- endif %} @@ -123,21 +138,21 @@ new_rows AS ( {% if enable_ghost_record %} - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}, {{ automate_dv.binary_ghost(none, hash) }}) AS {{ sat_name }}_{{ sat_pk_name }}, - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}, {{ automate_dv.date_ghost(date_type = sat_ldts.dtype, alias=none) }}) AS {{ sat_name }}_{{ sat_ldts_name }} {%- else %} - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}, {{ automate_dv.cast_binary(ghost_pk, quote=true) }}) AS {{ sat_name }}_{{ sat_pk_name }}, - COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}), + COALESCE({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}, {{ automate_dv.cast_date(ghost_date, as_string=true, datetime=true) }}) AS {{ sat_name }}_{{ sat_ldts_name }} @@ -155,14 +170,11 @@ new_rows AS ( {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %} - LEFT OUTER JOIN {{ ref(sat_name) }} AS {{ sat_name | lower ~ '_src' }} + LEFT OUTER JOIN {{ sat_name | lower ~ '_src' }} ON a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }} AND {{ sat_name | lower ~ '_src'}}.{{ sat_ldts }} <= a.AS_OF_DATE + AND {{ sat_name | lower ~ '_src'}}.next_{{ sat_ldts }} > a.AS_OF_DATE {% endfor %} - - GROUP BY - {{ automate_dv.prefix([src_pk], 'a') }}, - a.AS_OF_DATE ), pit AS ( @@ -177,4 +189,4 @@ pit AS ( SELECT DISTINCT * FROM pit -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%}