diff --git a/macros/materialisations/incremental_bridge_helpers.sql b/macros/materialisations/incremental_bridge_helpers.sql deleted file mode 100644 index f725e0390..000000000 --- a/macros/materialisations/incremental_bridge_helpers.sql +++ /dev/null @@ -1,26 +0,0 @@ -{% macro is_bridge_incremental() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'bridge_incremental' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} - -{% macro incremental_bridge_replace(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - TRUNCATE TABLE {{ target_relation }}; - - INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) - ( - SELECT {{ dest_cols_csv }} - FROM {{ tmp_relation }} - ); -{%- endmacro %} diff --git a/macros/materialisations/incremental_bridge_materialization.sql b/macros/materialisations/incremental_bridge_materialization.sql deleted file mode 100644 index 71ebc6543..000000000 --- a/macros/materialisations/incremental_bridge_materialization.sql +++ /dev/null @@ -1,53 +0,0 @@ -{% materialization bridge_incremental, default -%} - - {% set full_refresh_mode = flags.FULL_REFRESH %} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - - {% set tmp_relation = make_temp_relation(target_relation) %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do adapter.expand_target_column_types( - from_relation=tmp_relation, - to_relation=target_relation) %} - {% set build_sql = dbtvault.incremental_bridge_replace(tmp_relation, target_relation) %} -{% endif %} - - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} diff --git a/macros/materialisations/incremental_pit_helper.sql b/macros/materialisations/incremental_pit_helper.sql deleted file mode 100644 index 0b985acec..000000000 --- a/macros/materialisations/incremental_pit_helper.sql +++ /dev/null @@ -1,26 +0,0 @@ -{% macro is_pit_incremental() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'pit_incremental' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} - -{% macro incremental_pit_replace(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - TRUNCATE TABLE {{ target_relation }}; - - INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) - ( - SELECT {{ dest_cols_csv }} - FROM {{ tmp_relation }} - ); -{%- endmacro %} \ No newline at end of file diff --git a/macros/materialisations/incremental_pit_materialization.sql b/macros/materialisations/incremental_pit_materialization.sql deleted file mode 100644 index 8c49165c0..000000000 --- a/macros/materialisations/incremental_pit_materialization.sql +++ /dev/null @@ -1,53 +0,0 @@ -{% materialization pit_incremental, default -%} - - {% set full_refresh_mode = flags.FULL_REFRESH %} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - - {% set tmp_relation = make_temp_relation(target_relation) %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do adapter.expand_target_column_types( - from_relation=tmp_relation, - to_relation=target_relation) %} - {% set build_sql = dbtvault.incremental_pit_replace(tmp_relation, target_relation) %} -{% endif %} - - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} \ No newline at end of file diff --git a/macros/materialisations/shared_helpers.sql b/macros/materialisations/shared_helpers.sql index 2e65940c8..ed6b1971f 100644 --- a/macros/materialisations/shared_helpers.sql +++ b/macros/materialisations/shared_helpers.sql @@ -11,7 +11,7 @@ {%- macro is_any_incremental() -%} - {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or dbtvault.is_pit_incremental() or dbtvault.is_bridge_incremental() or is_incremental() -%} + {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() -%} {%- do return(true) -%} {%- else -%} {%- do return(false) -%} diff --git a/macros/tables/bridge.sql b/macros/tables/bridge.sql deleted file mode 100644 index 24d916ccf..000000000 --- a/macros/tables/bridge.sql +++ /dev/null @@ -1,276 +0,0 @@ -{%- macro bridge(src_pk, as_of_dates_table, bridge_walk, stage_tables_ldts, src_ldts, source_model) -%} - - {{- adapter.dispatch('bridge', packages = dbtvault.get_dbtvault_namespaces())(source_model=source_model, src_pk=src_pk, - bridge_walk=bridge_walk, - as_of_dates_table=as_of_dates_table, - stage_tables_ldts=stage_tables_ldts, - src_ldts=src_ldts) -}} -{%- endmacro -%} - -{%- macro default__bridge(src_pk, as_of_dates_table, bridge_walk, stage_tables_ldts, src_ldts, source_model) -%} - -{{ dbtvault.prepend_generated_by() }} - -{%- if (as_of_dates_table is none) and execute -%} - {%- set error_message -%} - "Bridge error: Missing as_of_dates table configuration. A as_of_dates_table must be provided." - {%- endset -%} - {{- exceptions.raise_compiler_error(error_message) -}} -{%- endif -%} - -{#- Acquiring the source relation for the AS_OF table -#} -{%- if as_of_dates_table is mapping and as_of_dates_table is not none -%} - {%- set source_name = as_of_dates_table | first -%} - {%- set source_table_name = as_of_dates_table[source_name] -%} - {%- set source_relation = source(source_name, source_table_name) -%} -{%- elif as_of_dates_table is not mapping and as_of_dates_table is not none -%} - {%- set source_relation = ref(as_of_dates_table) -%} -{%- endif -%} - -{%- set max_date = '9999-12-31 23:59:59.999' -%} -{%- set ghost_pk = '0000000000000000' -%} -{%- set ghost_date = '1990-01-01 00:00:00.000' -%} - -{#- Stating the dependencies on the stage tables outside of the If STATEMENT -#} -{% for stg in stage_tables_ldts -%} - {{- "-- depends_on: " ~ ref(stg) -}} -{%- endfor %} - -{#- Setting the new AS_OF dates CTE name -#} -{%- if dbtvault.is_any_incremental() -%} - {%- set new_as_of_dates_cte = 'NEW_ROWS_AS_OF' -%} -{%- else -%} - {%- set new_as_of_dates_cte = 'AS_OF' -%} -{%- endif %} - -WITH as_of AS ( - SELECT a.AS_OF_DATE - FROM {{ source_relation }} AS a - WHERE a.AS_OF_DATE <= CURRENT_DATE() -), - -{%- if dbtvault.is_any_incremental() %} - -last_safe_load_datetime AS ( - SELECT MIN(LOAD_DATETIME) AS LAST_SAFE_LOAD_DATETIME - FROM ( - {%- filter indent(width=8) -%} - {%- for stg in stage_tables_ldts -%} - {%- set stage_ldts =(stage_tables_ldts[stg]) -%} - {{ "SELECT MIN(" ~ stage_ldts ~ ") AS LOAD_DATETIME FROM " ~ ref(stg) }} - {{ "UNION ALL" if not loop.last }} - {% endfor -%} - {%- endfilter -%} - ) -), - -as_of_grain_old_entries AS ( - SELECT DISTINCT AS_OF_DATE - FROM {{ this }} -), - -as_of_grain_lost_entries AS ( - SELECT a.AS_OF_DATE - FROM as_of_grain_old_entries AS a - LEFT OUTER JOIN as_of AS b - ON a.AS_OF_DATE = b.AS_OF_DATE - WHERE b.AS_OF_DATE IS NULL -), - -as_of_grain_new_entries AS ( - SELECT a.AS_OF_DATE - FROM as_of AS a - LEFT OUTER JOIN as_of_grain_old_entries AS b - ON a.AS_OF_DATE = b.AS_OF_DATE - WHERE b.AS_OF_DATE IS NULL -), - -min_date AS ( - SELECT min(AS_OF_DATE) AS MIN_DATE - FROM as_of -), - -new_rows_pks AS ( - SELECT h.{{ src_pk }} - FROM {{ ref(source_model) }} AS h - WHERE h.{{ src_ldts }} >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) -), - -new_rows_as_of AS ( - SELECT AS_OF_DATE - FROM as_of - WHERE as_of.AS_OF_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) - UNION - SELECT as_of_date - FROM as_of_grain_new_entries -), - -overlap_pks AS ( - SELECT p.{{ src_pk }} - FROM {{ this }} AS p - INNER JOIN {{ ref(source_model) }} as h - ON p.{{ src_pk }} = h.{{ src_pk }} - WHERE p.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date) - AND p.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) - AND p.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries) -), - -overlap_as_of AS ( - SELECT AS_OF_DATE - FROM as_of AS p - WHERE p.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date) - AND p.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) - AND p.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries) -), - -overlap AS ( - SELECT - a.{{ src_pk }}, - b.AS_OF_DATE - {%- for bridge_step in bridge_walk.keys() -%} - {%- set link_table = bridge_walk[bridge_step]['link_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {%- set eff_sat_table = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} - {%- set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] -%} - {%- set eff_sat_end_date = bridge_walk[bridge_step]['eff_sat_end_date'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} - {%- filter indent(width=8) %} - {{ ',' ~ link_table ~ '.' ~ link_pk ~ ' AS ' ~ bridge_link_pk }} - {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_end_date ~ ' AS ' ~ bridge_end_date }} - {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_load_date ~' AS ' ~ bridge_load_date }} - {%- endfilter -%} - {% endfor %} - FROM overlap_pks AS a - INNER JOIN overlap_as_of AS b - ON (1=1) - {%- set loop_vars = namespace(lastlink = '', last_link_fk = '') -%} - {%- for bridge_step in bridge_walk.keys() -%} - {%- set current_link = bridge_walk[bridge_step]['link_table'] -%} - {%- set current_eff_sat = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set link_fk1 = bridge_walk[bridge_step]['link_fk1'] -%} - {%- set link_fk2 = bridge_walk[bridge_step]['link_fk2'] -%} - {%- set eff_sat_pk = bridge_walk[bridge_step]['eff_sat_pk'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} - {%- if loop.first %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON a.{{ src_pk }} = {{ current_link }}.{{ link_fk1 }} - {%- else %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ current_link }}.{{ link_fk1 }} - {%- endif %} - INNER JOIN {{ ref(current_eff_sat) }} AS {{ current_eff_sat }} - ON {{ current_eff_sat }}.{{ eff_sat_pk }} = {{ current_link }}.{{ link_pk }} - AND {{ current_eff_sat }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE - {%- set loop_vars.last_link = current_link -%} - {%- set loop_vars.last_link_fk2 = link_fk2 -%} - {% endfor %} -), -{%- endif %} - -new_rows AS ( - SELECT - a.{{ src_pk }}, - b.AS_OF_DATE - {%- for bridge_step in bridge_walk.keys() -%} - {%- set link_table = bridge_walk[bridge_step]['link_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {%- set eff_sat_table = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} - {%- set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] -%} - {%- set eff_sat_end_date = bridge_walk[bridge_step]['eff_sat_end_date'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} - {%- filter indent(width=8) -%} - {{ ',' ~ link_table ~'.'~ link_pk ~' AS '~ bridge_link_pk }} - {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_end_date ~ ' AS ' ~ bridge_end_date }} - {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_load_date ~ ' AS ' ~ bridge_load_date }} - {%- endfilter -%} - {% endfor %} - FROM {{ ref(source_model) }} AS a - INNER JOIN {{ new_as_of_dates_cte }} AS b - ON (1=1) - {%- set loop_vars = namespace(lastlink = '', last_link_fk = '') %} - {%- for bridge_step in bridge_walk.keys() -%} - {%- set current_link = bridge_walk[bridge_step]['link_table'] -%} - {%- set current_eff_sat = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set link_fk1 = bridge_walk[bridge_step]['link_fk1'] -%} - {%- set link_fk2 = bridge_walk[bridge_step]['link_fk2'] -%} - {%- set eff_sat_pk = bridge_walk[bridge_step]['eff_sat_pk'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} - {%- if loop.first %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON a.{{ src_pk }} = {{ current_link }}.{{ link_fk1 }} - {%- else %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ current_link }}.{{ link_fk1 }} - {%- endif %} - INNER JOIN {{ ref(current_eff_sat) }} AS {{ current_eff_sat }} - ON {{ current_eff_sat }}.{{ eff_sat_pk }} = {{ current_link }}.{{ link_pk }} - AND {{ current_eff_sat }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE - {%- set loop_vars.last_link = current_link -%} - {%- set loop_vars.last_link_fk2 = link_fk2 -%} - {% endfor %} -), - -{# Full data from bridge walk(s) -#} -all_rows AS ( - SELECT * FROM new_rows - {%- if dbtvault.is_any_incremental() %} - UNION ALL - SELECT * FROM overlap - {%- endif %} -), - -{# Select most recent set of relationship key(s) for each as of date -#} -candidate_rows AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY AS_OF_DATE, - {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {%- if loop.first %} - {{ bridge_link_pk }} - {%- else %} - {{ ','~ bridge_link_pk }} - {%- endif -%} - {%- endfor %} - ORDER BY - {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] %} - {%- if loop.first %} - {{ bridge_load_date ~' DESC' }} - {%- else %} - {{ ','~ bridge_load_date ~' DESC' }} - {%- endif -%} - {%- endfor %} - ) AS row_num - FROM all_rows - QUALIFY row_num = 1 -), - -bridge AS ( - SELECT - {{ src_pk }}, - AS_OF_DATE - {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {{ ',' ~ bridge_link_pk }} - {%- endfor %} - FROM candidate_rows - {%- for bridge_step in bridge_walk.keys() -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} - {%- if loop.first %} - WHERE TO_DATE({{ bridge_end_date }}) = TO_DATE('{{ max_date }}') - {%- else %} - AND TO_DATE({{ bridge_end_date }}) = TO_DATE('{{ max_date }}') - {%- endif -%} - {%- endfor %} -) - -SELECT * FROM bridge - -{%- endmacro -%} diff --git a/macros/tables/pit.sql b/macros/tables/pit.sql deleted file mode 100644 index 84a7936d5..000000000 --- a/macros/tables/pit.sql +++ /dev/null @@ -1,211 +0,0 @@ -{%- macro pit(src_pk, as_of_dates_table, satellites, stage_tables, src_ldts, source_model ) -%} - - {{- adapter.dispatch('pit', packages = dbtvault.get_dbtvault_namespaces())(source_model=source_model, src_pk=src_pk, - as_of_dates_table=as_of_dates_table, - satellites=satellites, - stage_tables=stage_tables, - src_ldts=src_ldts) -}} -{%- endmacro -%} - -{%- macro default__pit(src_pk, as_of_dates_table, satellites, stage_tables, src_ldts, source_model) -%} - -{{ dbtvault.prepend_generated_by() }} - -{%- if (as_of_dates_table is none) and execute -%} - {%- set error_message -%} - "pit error: Missing as_of_dates table configuration. A as_of_dates_table must be provided." - {%- endset -%} - {{- exceptions.raise_compiler_error(error_message) -}} -{%- endif -%} - -{#- Acquiring the source relation for the AS_OF table -#} -{%- if as_of_dates_table is mapping and as_of_dates_table is not none -%} - {%- set source_name = as_of_dates_table | first -%} - {%- set source_table_name = as_of_dates_table[source_name] -%} - {%- set as_of_table_relation = source(source_name, source_table_name) -%} -{%- elif as_of_dates_table is not mapping and as_of_dates_table is not none -%} - {%- set as_of_table_relation = ref(as_of_dates_table) -%} -{%- endif -%} - -{#- Setting ghost values to replace NULLS -#} -{%- set ghost_pk = '0000000000000000' -%} -{%- set ghost_date = '1900-01-01 00:00:00.000000' %} - -{# Stating the dependancies on the stage tables outside of the If STATEMENT #} -{% for stg in stage_tables -%} - {{ "-- depends_on: " ~ ref(stg) }} -{% endfor %} - -{#- Setting the new AS_OF dates CTE name -#} -{%- if dbtvault.is_any_incremental() -%} -{%- set new_as_of_dates_cte = 'new_rows_as_of' -%} -{%- else -%} -{%- set new_as_of_dates_cte = 'as_of_dates' -%} -{%- endif %} - -WITH as_of_dates AS ( - SELECT * FROM {{ as_of_table_relation }} -), - -{%- if dbtvault.is_any_incremental() %} - -last_safe_load_datetime AS ( - SELECT MIN(LOAD_DATETIME) AS LAST_SAFE_LOAD_DATETIME FROM ( - {%- for stg in stage_tables -%} - {%- set stage_ldts = stage_tables[stg] %} - {{ "SELECT MIN({}) AS LOAD_DATETIME FROM {}".format(stage_ldts, ref(stg)) }} - {{ "UNION ALL" if not loop.last }} - {%- endfor %} - ) -), - -as_of_grain_old_entries AS ( - SELECT DISTINCT AS_OF_DATE FROM {{ this }} -), - -as_of_grain_lost_entries AS ( - SELECT a.AS_OF_DATE - FROM as_of_grain_old_entries AS a - LEFT OUTER JOIN as_of_dates AS b - ON a.AS_OF_DATE = b.AS_OF_DATE - WHERE b.AS_OF_DATE IS NULL -), - -as_of_grain_new_entries AS ( - SELECT a.AS_OF_DATE - FROM as_of_dates AS a - LEFT OUTER JOIN as_of_grain_old_entries AS b - ON a.AS_OF_DATE = b.AS_OF_DATE - WHERE b.AS_OF_DATE IS NULL -), - -min_date AS ( - SELECT min(AS_OF_DATE) AS MIN_DATE - FROM as_of_dates -), - -backfill_as_of AS ( - SELECT AS_OF_DATE - FROM as_of_dates AS a - WHERE a.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) -), - -new_rows_pks AS ( - SELECT a.{{ src_pk }} - FROM {{ ref(source_model) }} AS a - WHERE a.{{ src_ldts }} >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) -), - -new_rows_as_of AS ( - SELECT AS_OF_DATE - FROM as_of_dates AS a - WHERE a.AS_OF_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) - UNION - SELECT AS_OF_DATE - FROM as_of_grain_new_entries -), - -overlap AS ( - SELECT a.* - FROM {{ this }} AS a - INNER JOIN {{ ref(source_model) }} as b - ON a.{{ src_pk }} = b.{{ src_pk }} - WHERE a.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date) - AND a.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime) - AND a.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries) -), - --- Back-fill any newly arrived hubs, set all historical pit dates to ghost records - -backfill_rows_as_of_dates AS ( - SELECT - a.{{ src_pk }}, - b.AS_OF_DATE - FROM new_rows_pks AS a - INNER JOIN backfill_as_of AS b - ON (1=1 ) -), - -backfill AS ( - SELECT - a.{{ src_pk }}, - a.AS_OF_DATE, - {%- for sat_name in satellites -%} - {%- set sat_key_name = (satellites[sat_name]['pk'].keys() | list )[0] | upper -%} - {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] | upper -%} - {%- set sat_name = sat_name | upper %} - {{ "'{}'::BINARY(16) AS {}".format(ghost_pk, sat_name, sat_key_name) }}, - {{ "'{}'::TIMESTAMP_NTZ AS {}_{}".format(ghost_date, sat_name, sat_ldts_name) }} - {{- ',' if not loop.last -}} - {%- endfor %} - FROM backfill_rows_as_of_dates AS a - - {% for sat_name in satellites -%} - {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} - {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%} - LEFT JOIN {{ ref(sat_name) }} AS {{ "{}_src".format(sat_name | lower) }} - {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ "{}_src.{}".format(sat_name | lower, sat_pk) }} - {{ "AND" | indent(4) }} {{ "{}_src.{}".format(sat_name | lower, sat_ldts) }} <= a.AS_OF_DATE - {% endfor -%} - - GROUP BY - a.{{- src_pk }}, a.AS_OF_DATE - ORDER BY (1, 2) -), -{%- endif %} - -new_rows_as_of_dates AS ( - SELECT - a.{{ src_pk }}, - b.AS_OF_DATE - FROM {{ ref(source_model) }} AS a - INNER JOIN {{ new_as_of_dates_cte }} AS b - ON (1=1) -), - -new_rows AS ( - SELECT - a.{{ src_pk }}, - a.AS_OF_DATE, - {%- for sat_name in satellites -%} - {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} - {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %} - {{ ("COALESCE(MAX({}_src.{}), '{}'::BINARY(16)) AS {}_{}".format(sat_name | lower, sat_pk, ghost_pk, sat_name | upper, sat_pk_name | upper )) }}, - {{ ("COALESCE(MAX({}_src.{}), '{}'::TIMESTAMP_NTZ) AS {}_{}".format(sat_name | lower, sat_ldts, ghost_date, sat_name | upper, sat_ldts_name | upper)) }} - {{- "," if not loop.last }} - {%- endfor %} - FROM new_rows_as_of_dates AS a - - {% for sat_name in satellites -%} - {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} - {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%} - LEFT JOIN {{ ref(sat_name) }} AS {{ "{}_src".format(sat_name | lower) }} - {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ "{}_src.{}".format(sat_name | lower, sat_pk) }} - {{ "AND" | indent(4) }} {{ "{}_src.{}".format(sat_name | lower, sat_ldts) }} <= a.AS_OF_DATE - {% endfor -%} - - GROUP BY - a.{{- src_pk }}, a.AS_OF_DATE - ORDER BY (1, 2) -), - -pit AS ( - SELECT * FROM new_rows -{%- if dbtvault.is_any_incremental() %} - UNION ALL - SELECT * FROM overlap - UNION ALL - SELECT * FROM backfill - -{%- endif %} -) - -SELECT DISTINCT * FROM pit - -{%- endmacro -%} diff --git a/macros/tables/xts.sql b/macros/tables/xts.sql deleted file mode 100644 index 7845c9cab..000000000 --- a/macros/tables/xts.sql +++ /dev/null @@ -1,61 +0,0 @@ -{%- macro xts(src_pk, src_satellite, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('xts', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, - src_satellite=src_satellite, - src_ldts=src_ldts, - src_source=src_source, - source_model=source_model) -}} -{%- endmacro -%} - -{%- macro default__xts(src_pk, src_satellite, src_ldts, src_source, source_model) -%} - -{{ dbtvault.prepend_generated_by() }} - -{%- if not (source_model is iterable and source_model is not string) -%} - {%- set source_model = [source_model] -%} -{%- endif %} - -{{ 'WITH ' }} -{%- for src in source_model %} - {%- for satellite in src_satellite.items() -%} - {%- set satellite_name = (satellite[1]['sat_name'].values() | list) [0] -%} - {%- set hashdiff = (satellite[1]['hashdiff'].values() | list) [0] %} - - satellite_{{ satellite_name }}_from_{{ src }} AS ( - SELECT {{ src_pk }}, {{ hashdiff }} AS HASHDIFF, {{ satellite_name }} AS SATELLITE_NAME, {{ src_ldts }}, {{ src_source }} - FROM {{ ref(src) }} - WHERE {{ src_pk }} IS NOT NULL - ), - {%- endfor %} -{%- endfor %} - -union_satellites AS ( - {%- for src in source_model %} - {%- for satellite in src_satellite.items() %} - SELECT * FROM satellite_{{ (satellite[1]['sat_name'].values() | list) [0] }}_from_{{ src }} - {%- if not loop.last %} - UNION ALL - {%- endif %} - {%- endfor %} - {%- if not loop.last %} - UNION ALL - {%- endif %} - {%- endfor %} -), - -records_to_insert AS ( - SELECT DISTINCT union_satellites.* FROM union_satellites - {%- if dbtvault.is_vault_insert_by_period() or is_incremental() %} - LEFT JOIN {{ this }} AS d - ON (union_satellites.{{ 'HASHDIFF' }} = d.{{ 'HASHDIFF' }} - AND union_satellites.{{ src_ldts }} = d.{{ src_ldts }} - AND union_satellites.{{ 'SATELLITE_NAME' }} = d.{{ 'SATELLITE_NAME' }} - ) - WHERE {{ dbtvault.prefix(['HASHDIFF'], 'd') }} IS NULL - AND {{ dbtvault.prefix([ src_ldts ], 'd') }} IS NULL - AND {{ dbtvault.prefix([ 'SATELLITE_NAME' ], 'd') }} IS NULL - {%- endif %} -) - -SELECT * FROM records_to_insert - -{%- endmacro -%} \ No newline at end of file