From b0d8ff0c9f84974551f90350abfd9e347d2b3e9d Mon Sep 17 00:00:00 2001 From: Alex Higgs Date: Mon, 21 Feb 2022 17:16:17 +0000 Subject: [PATCH] dbtvault 0.8 --- README.md | 25 +-- macros/internal/{ => helpers}/is_checks.sql | 20 ++ .../{ => helpers}/prepend_generated_by.sql | 0 .../{ => helpers}/stage_processing_macros.sql | 0 .../{ => metadata_processing}/alias.sql | 0 .../{ => metadata_processing}/alias_all.sql | 0 .../{ => metadata_processing}/as_constant.sql | 12 +- .../check_required_parameters.sql | 0 .../metadata_processing/concat_ws.sql | 22 +++ .../escape_column_names.sql | 133 +++++++++++++ .../expand_column_list.sql | 0 .../{ => metadata_processing}/multikey.sql | 0 .../incremental_bridge_helpers.sql | 26 --- .../incremental_bridge_materialization.sql | 66 ++++--- .../incremental_pit_bridge_replace.sql | 30 +++ .../incremental_pit_helper.sql | 26 --- .../incremental_pit_materialization.sql | 66 ++++--- macros/materialisations/mat_is_checks.sql | 70 +++++++ .../materialisations/period_mat_helpers.sql | 186 ------------------ .../get_period_boundaries.sql | 109 ++++++++++ .../get_period_filter_sql.sql | 43 ++++ .../period_mat_helpers/get_period_of_load.sql | 59 ++++++ .../get_start_stop_dates.sql | 40 ++++ ...replace_placeholder_with_period_filter.sql | 62 ++++++ macros/materialisations/rank_mat_helpers.sql | 77 -------- .../rank_mat_helpers/get_min_max_ranks.sql | 34 ++++ .../replace_placeholder_with_rank_filter.sql | 46 +++++ macros/materialisations/shared_helpers.sql | 13 +- ...vault_insert_by_period_materialization.sql | 35 ++-- .../vault_insert_by_rank_materialization.sql | 39 ++-- macros/staging/derive_columns.sql | 9 +- macros/staging/rank_columns.sql | 12 +- macros/staging/stage.sql | 14 +- macros/supporting/current_timestamp.sql | 7 + macros/supporting/datatypes.sql | 8 +- macros/supporting/hash.sql | 166 +++++++++++++++- macros/supporting/max_datetime.sql | 12 ++ macros/tables/bigquery/hub.sql | 102 ++++++++++ macros/tables/bigquery/link.sql | 102 ++++++++++ macros/tables/bigquery/sat.sql | 74 +++++++ macros/tables/{ => snowflake}/bridge.sql | 93 ++++----- macros/tables/{ => snowflake}/eff_sat.sql | 9 + macros/tables/{ => snowflake}/hub.sql | 7 +- macros/tables/{ => snowflake}/link.sql | 7 +- macros/tables/{ => snowflake}/ma_sat.sql | 17 +- macros/tables/{ => snowflake}/pit.sql | 48 +++-- macros/tables/{ => snowflake}/sat.sql | 15 +- macros/tables/{ => snowflake}/t_link.sql | 9 +- macros/tables/{ => snowflake}/xts.sql | 8 +- macros/tables/sqlserver/hub.sql | 112 +++++++++++ macros/tables/sqlserver/link.sql | 113 +++++++++++ macros/tables/sqlserver/sat.sql | 74 +++++++ 52 files changed, 1723 insertions(+), 534 deletions(-) rename macros/internal/{ => helpers}/is_checks.sql (67%) rename macros/internal/{ => helpers}/prepend_generated_by.sql (100%) rename macros/internal/{ => helpers}/stage_processing_macros.sql (100%) rename macros/internal/{ => metadata_processing}/alias.sql (100%) rename macros/internal/{ => metadata_processing}/alias_all.sql (100%) rename macros/internal/{ => metadata_processing}/as_constant.sql (74%) rename macros/internal/{ => metadata_processing}/check_required_parameters.sql (100%) create mode 100644 macros/internal/metadata_processing/concat_ws.sql create mode 100644 macros/internal/metadata_processing/escape_column_names.sql rename macros/internal/{ => metadata_processing}/expand_column_list.sql (100%) rename macros/internal/{ => metadata_processing}/multikey.sql (100%) delete mode 100644 macros/materialisations/incremental_bridge_helpers.sql create mode 100644 macros/materialisations/incremental_pit_bridge_replace.sql delete mode 100644 macros/materialisations/incremental_pit_helper.sql create mode 100644 macros/materialisations/mat_is_checks.sql delete mode 100644 macros/materialisations/period_mat_helpers.sql create mode 100644 macros/materialisations/period_mat_helpers/get_period_boundaries.sql create mode 100644 macros/materialisations/period_mat_helpers/get_period_filter_sql.sql create mode 100644 macros/materialisations/period_mat_helpers/get_period_of_load.sql create mode 100644 macros/materialisations/period_mat_helpers/get_start_stop_dates.sql create mode 100644 macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql delete mode 100644 macros/materialisations/rank_mat_helpers.sql create mode 100644 macros/materialisations/rank_mat_helpers/get_min_max_ranks.sql create mode 100644 macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql create mode 100644 macros/tables/bigquery/hub.sql create mode 100644 macros/tables/bigquery/link.sql create mode 100644 macros/tables/bigquery/sat.sql rename macros/tables/{ => snowflake}/bridge.sql (63%) rename macros/tables/{ => snowflake}/eff_sat.sql (92%) rename macros/tables/{ => snowflake}/hub.sql (90%) rename macros/tables/{ => snowflake}/link.sql (91%) rename macros/tables/{ => snowflake}/ma_sat.sql (85%) rename macros/tables/{ => snowflake}/pit.sql (70%) rename macros/tables/{ => snowflake}/sat.sql (82%) rename macros/tables/{ => snowflake}/t_link.sql (84%) rename macros/tables/{ => snowflake}/xts.sql (83%) create mode 100644 macros/tables/sqlserver/hub.sql create mode 100644 macros/tables/sqlserver/link.sql create mode 100644 macros/tables/sqlserver/sat.sql diff --git a/README.md b/README.md index a2ed24379..768b5b990 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,17 @@ -

+

dbtvault -

-

- Documentation - Join our slack -

+ [![Documentation Status](https://readthedocs.org/projects/dbtvault/badge/?version=stable)](https://dbtvault.readthedocs.io/en/stable/?badge=stable) + [![Slack](https://img.shields.io/badge/Slack-Join-yellow?style=flat&logo=slack)](https://join.slack.com/t/dbtvault/shared_invite/enQtODY5MTY3OTIyMzg2LWJlZDMyNzM4YzAzYjgzYTY0MTMzNTNjN2EyZDRjOTljYjY0NDYyYzEwMTlhODMzNGY3MmU2ODNhYWUxYmM2NjA) + +
[Changelog and past doc versions](https://dbtvault.readthedocs.io/en/latest/changelog/stable) # dbtvault by [Datavault](https://www.data-vault.co.uk) -Build your own Data Vault data warehouse! dbtvault is a free to use dbt package that generates & executes the ETL you need to run a Data Vault 2.0 Data Warehouse on a Snowflake database. +Build your own Data Vault data warehouse! dbtvault is a free to use dbt package that generates & executes the ETL you need to run a Data Vault 2.0 Data Warehouse on your data platform. What does dbtvault offer? - productivity gains, fewer errors @@ -37,9 +30,9 @@ Learn quickly with our worked example: - [Project Repository](https://github.com/Datavault-UK/snowflakeDemo) -## Supported databases: +## Supported platforms: -- [snowflake](https://www.snowflake.com/about/) +[Platform support matrix](https://dbtvault.readthedocs.io/en/latest/macros/#platform-support) ## Installation @@ -56,7 +49,7 @@ or [read the docs](https://docs.getdbt.com/docs/building-a-dbt-project/package-m # Configure model {{- config(...) -}} -# Set metadata +# Provide metadata {%- set src_pk = ... -%} ... diff --git a/macros/internal/is_checks.sql b/macros/internal/helpers/is_checks.sql similarity index 67% rename from macros/internal/is_checks.sql rename to macros/internal/helpers/is_checks.sql index a3f3c1228..f657a2d2b 100644 --- a/macros/internal/is_checks.sql +++ b/macros/internal/helpers/is_checks.sql @@ -12,6 +12,8 @@ {%- endmacro -%} + + {%- macro is_nothing(obj) -%} {%- if obj is none or obj is undefined or not obj -%} @@ -22,6 +24,8 @@ {%- endmacro -%} + + {%- macro is_something(obj) -%} {%- if obj is not none and obj is defined and obj -%} @@ -30,4 +34,20 @@ {%- do return(false) -%} {%- endif -%} +{%- endmacro -%} + + + +{%- macro is_expression(obj) -%} + + {%- if obj is string -%} + {%- if (obj | first == "'" and obj | last == "'") or ("(" in obj and ")" in obj) or "::" in obj -%} + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/prepend_generated_by.sql b/macros/internal/helpers/prepend_generated_by.sql similarity index 100% rename from macros/internal/prepend_generated_by.sql rename to macros/internal/helpers/prepend_generated_by.sql diff --git a/macros/internal/stage_processing_macros.sql b/macros/internal/helpers/stage_processing_macros.sql similarity index 100% rename from macros/internal/stage_processing_macros.sql rename to macros/internal/helpers/stage_processing_macros.sql diff --git a/macros/internal/alias.sql b/macros/internal/metadata_processing/alias.sql similarity index 100% rename from macros/internal/alias.sql rename to macros/internal/metadata_processing/alias.sql diff --git a/macros/internal/alias_all.sql b/macros/internal/metadata_processing/alias_all.sql similarity index 100% rename from macros/internal/alias_all.sql rename to macros/internal/metadata_processing/alias_all.sql diff --git a/macros/internal/as_constant.sql b/macros/internal/metadata_processing/as_constant.sql similarity index 74% rename from macros/internal/as_constant.sql rename to macros/internal/metadata_processing/as_constant.sql index 19e06b289..868b1b918 100644 --- a/macros/internal/as_constant.sql +++ b/macros/internal/metadata_processing/as_constant.sql @@ -14,8 +14,16 @@ {%- else -%} - {{- return(column_str) -}} - + {%- if dbtvault.is_expression(column_str) -%} + + {{- return(column_str) -}} + + {%- else -%} + + {{- return(dbtvault.escape_column_names(column_str)) -}} + + {%- endif -%} + {%- endif -%} {%- else -%} {%- if execute -%} diff --git a/macros/internal/check_required_parameters.sql b/macros/internal/metadata_processing/check_required_parameters.sql similarity index 100% rename from macros/internal/check_required_parameters.sql rename to macros/internal/metadata_processing/check_required_parameters.sql diff --git a/macros/internal/metadata_processing/concat_ws.sql b/macros/internal/metadata_processing/concat_ws.sql new file mode 100644 index 000000000..f8f1aed35 --- /dev/null +++ b/macros/internal/metadata_processing/concat_ws.sql @@ -0,0 +1,22 @@ +{%- macro concat_ws(string_list, separator="||") -%} + + {{- adapter.dispatch('concat_ws', 'dbtvault')(string_list=string_list, separator=separator) -}} + +{%- endmacro %} + +{%- macro default__concat_ws(string_list, separator="||") -%} + + {{ "CONCAT_WS('" ~ separator ~ "', " ~ string_list | join(", ") ~ ")" }} + +{%- endmacro -%} + +{%- macro bigquery__concat_ws(string_list, separator="||") -%} + + {{- 'CONCAT(' -}} + {%- for str in string_list -%} + {{- "{}".format(str) -}} + {{- ",'{}',".format(separator) if not loop.last -}} + {%- endfor -%} + {{- '\n)' -}} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/metadata_processing/escape_column_names.sql b/macros/internal/metadata_processing/escape_column_names.sql new file mode 100644 index 000000000..4c8d3063d --- /dev/null +++ b/macros/internal/metadata_processing/escape_column_names.sql @@ -0,0 +1,133 @@ +{%- macro escape_column_names(columns=none) -%} + +{# Different platforms use different escape characters, the default below is for Snowflake which uses double quotes #} + + {%- if dbtvault.is_something(columns) -%} + + {%- set col_string = '' -%} + {%- set col_list = [] -%} + {%- set col_mapping = {} -%} + + {%- if columns is string -%} + + {%- set col_string = dbtvault.escape_column_name(columns) -%} + + {%- elif dbtvault.is_list(columns) -%} + + {%- for col in columns -%} + + {%- if col is string -%} + + {%- set escaped_col = dbtvault.escape_column_name(col) -%} + + {%- do col_list.append(escaped_col) -%} + + {%- else -%} + + {%- if execute -%} + {{- exceptions.raise_compiler_error("Invalid column name(s) provided. Must be a string.") -}} + {%- endif -%} + + {%- endif -%} + + {%- endfor -%} + + {%- elif columns is mapping -%} + + {%- if columns['source_column'] and columns['alias'] -%} + + {%- set escaped_source_col = dbtvault.escape_column_name(columns['source_column']) -%} + {%- set escaped_alias_col = dbtvault.escape_column_name(columns['alias']) -%} + {%- set col_mapping = {"source_column": escaped_source_col, "alias": escaped_alias_col} -%} + + {%- else -%} + + {%- if execute -%} + {{- exceptions.raise_compiler_error("Invalid column name(s) provided. Must be a string, a list of strings, or a dictionary of hashdiff metadata.") -}} + {%- endif %} + + {%- endif -%} + + {%- else -%} + + {%- if execute -%} + {{- exceptions.raise_compiler_error("Invalid column name(s) provided. Must be a string, a list of strings, or a dictionary of hashdiff metadata.") -}} + {%- endif %} + + {%- endif -%} + + {%- elif columns == '' -%} + + {%- if execute -%} + {{- exceptions.raise_compiler_error("Expected a column name or a list of column names, got an empty string") -}} + {%- endif -%} + + {%- endif -%} + +{%- if columns is none -%} + + {%- do return(none) -%} + +{%- elif columns == [] -%} + + {%- do return([]) -%} + +{%- elif columns == {} -%} + + {%- do return({}) -%} + +{%- elif columns is string -%} + + {%- do return(col_string) -%} + +{%- elif dbtvault.is_list(columns) -%} + + {%- do return(col_list) -%} + +{%- elif columns is mapping -%} + + {%- do return(col_mapping) -%} + +{%- endif -%} + +{%- endmacro -%} + + +{%- macro escape_column_name(column) -%} + + {{- adapter.dispatch('escape_column_name', 'dbtvault')(column=column) -}} + +{%- endmacro %} + +{%- macro default__escape_column_name(column) -%} + + {%- set escape_char_left = var('escape_char_left', '"') -%} + {%- set escape_char_right = var('escape_char_right', '"') -%} + + {%- set escaped_column_name = escape_char_left ~ column | replace(escape_char_left, '') | replace(escape_char_right, '') | trim ~ escape_char_right -%} + + {%- do return(escaped_column_name) -%} + +{%- endmacro -%} + +{%- macro sqlserver__escape_column_name(column) -%} + + {%- set escape_char_left = var('escape_char_left', '"') -%} + {%- set escape_char_right = var('escape_char_right', '"') -%} + + {%- set escaped_column_name = escape_char_left ~ column | replace(escape_char_left, '') | replace(escape_char_right, '') | trim ~ escape_char_right -%} + + {%- do return(escaped_column_name) -%} + +{%- endmacro -%} + +{%- macro bigquery__escape_column_name(column) -%} + + {%- set escape_char_left = var('escape_char_left', '`') -%} + {%- set escape_char_right = var('escape_char_right', '`') -%} + + {%- set escaped_column_name = escape_char_left ~ column | replace(escape_char_left, '') | replace(escape_char_right, '') | trim ~ escape_char_right -%} + + {%- do return(escaped_column_name) -%} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/expand_column_list.sql b/macros/internal/metadata_processing/expand_column_list.sql similarity index 100% rename from macros/internal/expand_column_list.sql rename to macros/internal/metadata_processing/expand_column_list.sql diff --git a/macros/internal/multikey.sql b/macros/internal/metadata_processing/multikey.sql similarity index 100% rename from macros/internal/multikey.sql rename to macros/internal/metadata_processing/multikey.sql diff --git a/macros/materialisations/incremental_bridge_helpers.sql b/macros/materialisations/incremental_bridge_helpers.sql deleted file mode 100644 index f725e0390..000000000 --- a/macros/materialisations/incremental_bridge_helpers.sql +++ /dev/null @@ -1,26 +0,0 @@ -{% macro is_bridge_incremental() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'bridge_incremental' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} - -{% macro incremental_bridge_replace(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - TRUNCATE TABLE {{ target_relation }}; - - INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) - ( - SELECT {{ dest_cols_csv }} - FROM {{ tmp_relation }} - ); -{%- endmacro %} diff --git a/macros/materialisations/incremental_bridge_materialization.sql b/macros/materialisations/incremental_bridge_materialization.sql index 71ebc6543..5f036cb10 100644 --- a/macros/materialisations/incremental_bridge_materialization.sql +++ b/macros/materialisations/incremental_bridge_materialization.sql @@ -1,53 +1,57 @@ -{% materialization bridge_incremental, default -%} +{%- materialization bridge_incremental, default -%} - {% set full_refresh_mode = flags.FULL_REFRESH %} + {%- set full_refresh_mode = should_full_refresh() -%} - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} + {% if target.type == "sqlserver" %} + {%- set target_relation = this.incorporate(type='table') -%} + {% else %} + {%- set target_relation = this -%} + {% endif %} + {%- set existing_relation = load_relation(this) -%} + {%- set tmp_relation = make_temp_relation(target_relation) -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% set to_drop = [] %} - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} + {%- set to_drop = [] -%} + {%- if existing_relation is none -%} + {%- set build_sql = create_table_as(False, target_relation, sql) -%} + {%- elif existing_relation.is_view or full_refresh_mode -%} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - - {% set tmp_relation = make_temp_relation(target_relation) %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do adapter.expand_target_column_types( + {%- set backup_identifier = existing_relation.identifier ~ "__dbt_backup" -%} + {%- set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) -%} + {%- do adapter.drop_relation(backup_relation) -%} + + {%- do adapter.rename_relation(target_relation, backup_relation) -%} + {%- set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do to_drop.append(backup_relation) -%} + {%- else -%} + + {%- set tmp_relation = make_temp_relation(target_relation) -%} + {%- do run_query(create_table_as(True, tmp_relation, sql)) -%} + {%- do adapter.expand_target_column_types( from_relation=tmp_relation, - to_relation=target_relation) %} - {% set build_sql = dbtvault.incremental_bridge_replace(tmp_relation, target_relation) %} -{% endif %} + to_relation=target_relation) -%} + {%- set build_sql = dbtvault.incremental_bridge_replace(tmp_relation, target_relation) -%} +{%- endif -%} - {% call statement("main") %} + {%- call statement("main") -%} {{ build_sql }} - {% endcall %} + {%- endcall -%} {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here - {% do adapter.commit() %} + {%- do adapter.commit() -%} - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} + {%- for rel in to_drop -%} + {%- do adapter.drop_relation(rel) -%} + {%- endfor -%} {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} -{%- endmaterialization %} +{%- endmaterialization -%} diff --git a/macros/materialisations/incremental_pit_bridge_replace.sql b/macros/materialisations/incremental_pit_bridge_replace.sql new file mode 100644 index 000000000..7369297ba --- /dev/null +++ b/macros/materialisations/incremental_pit_bridge_replace.sql @@ -0,0 +1,30 @@ +{% macro incremental_pit_replace(tmp_relation, target_relation, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + TRUNCATE TABLE {{ target_relation }}; + + INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) + ( + SELECT {{ dest_cols_csv }} + FROM {{ tmp_relation }} + ); +{%- endmacro %} + + + +{% macro incremental_bridge_replace(tmp_relation, target_relation, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + TRUNCATE TABLE {{ target_relation }}; + + INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) + ( + SELECT {{ dest_cols_csv }} + FROM {{ tmp_relation }} + ); +{%- endmacro %} + + + diff --git a/macros/materialisations/incremental_pit_helper.sql b/macros/materialisations/incremental_pit_helper.sql deleted file mode 100644 index 0b985acec..000000000 --- a/macros/materialisations/incremental_pit_helper.sql +++ /dev/null @@ -1,26 +0,0 @@ -{% macro is_pit_incremental() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'pit_incremental' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} - -{% macro incremental_pit_replace(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - TRUNCATE TABLE {{ target_relation }}; - - INSERT INTO {{ target_relation }} ({{ dest_cols_csv }}) - ( - SELECT {{ dest_cols_csv }} - FROM {{ tmp_relation }} - ); -{%- endmacro %} \ No newline at end of file diff --git a/macros/materialisations/incremental_pit_materialization.sql b/macros/materialisations/incremental_pit_materialization.sql index 8c49165c0..7baef1f2d 100644 --- a/macros/materialisations/incremental_pit_materialization.sql +++ b/macros/materialisations/incremental_pit_materialization.sql @@ -1,53 +1,57 @@ -{% materialization pit_incremental, default -%} +{%- materialization pit_incremental, default -%} - {% set full_refresh_mode = flags.FULL_REFRESH %} + {%- set full_refresh_mode = should_full_refresh() -%} - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} + {% if target.type == "sqlserver" %} + {%- set target_relation = this.incorporate(type='table') -%} + {% else %} + {%- set target_relation = this -%} + {% endif %} + {%- set existing_relation = load_relation(this) -%} + {%- set tmp_relation = make_temp_relation(target_relation) -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% set to_drop = [] %} - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} + {%- set to_drop = [] -%} + {%- if existing_relation is none -%} + {%- set build_sql = create_table_as(False, target_relation, sql) -%} + {%- elif existing_relation.is_view or full_refresh_mode -%} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - - {% set tmp_relation = make_temp_relation(target_relation) %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do adapter.expand_target_column_types( + {%- set backup_identifier = existing_relation.identifier ~ "__dbt_backup" -%} + {%- set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) -%} + {%- do adapter.drop_relation(backup_relation) -%} + + {%- do adapter.rename_relation(target_relation, backup_relation) -%} + {%- set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do to_drop.append(backup_relation) -%} + {%- else -%} + + {%- set tmp_relation = make_temp_relation(target_relation) -%} + {%- do run_query(create_table_as(True, tmp_relation, sql)) -%} + {%- do adapter.expand_target_column_types( from_relation=tmp_relation, - to_relation=target_relation) %} - {% set build_sql = dbtvault.incremental_pit_replace(tmp_relation, target_relation) %} -{% endif %} + to_relation=target_relation) -%} + {%- set build_sql = dbtvault.incremental_pit_replace(tmp_relation, target_relation) -%} +{%- endif -%} - {% call statement("main") %} + {%- call statement("main") -%} {{ build_sql }} - {% endcall %} + {%- endcall -%} {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here - {% do adapter.commit() %} + {%- do adapter.commit() -%} - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} + {%- for rel in to_drop -%} + {%- do adapter.drop_relation(rel) -%} + {%- endfor -%} {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} -{%- endmaterialization %} \ No newline at end of file +{%- endmaterialization -%} \ No newline at end of file diff --git a/macros/materialisations/mat_is_checks.sql b/macros/materialisations/mat_is_checks.sql new file mode 100644 index 000000000..95778ffc3 --- /dev/null +++ b/macros/materialisations/mat_is_checks.sql @@ -0,0 +1,70 @@ +{%- macro is_any_incremental() -%} + {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or dbtvault.is_pit_incremental() or dbtvault.is_bridge_incremental() or is_incremental() -%} + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} +{%- endmacro -%} + + + +{% macro is_vault_insert_by_period() %} + {% if not execute %} + {{ return(False) }} + {% else %} + {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} + + {{ return(relation is not none + and relation.type == 'table' + and model.config.materialized == 'vault_insert_by_period' + and not flags.FULL_REFRESH) }} + {% endif %} +{% endmacro %} + + + +{% macro is_vault_insert_by_rank() %} + {#-- do not run introspective queries in parsing #} + {% if not execute %} + {{ return(False) }} + {% else %} + {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} + + {{ return(relation is not none + and relation.type == 'table' + and model.config.materialized == 'vault_insert_by_rank' + and not flags.FULL_REFRESH) }} + {% endif %} +{% endmacro %} + + + +{% macro is_bridge_incremental() %} + {#-- do not run introspective queries in parsing #} + {% if not execute %} + {{ return(False) }} + {% else %} + {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} + + {{ return(relation is not none + and relation.type == 'table' + and model.config.materialized == 'bridge_incremental' + and not flags.FULL_REFRESH) }} + {% endif %} +{% endmacro %} + + + +{% macro is_pit_incremental() %} + {#-- do not run introspective queries in parsing #} + {% if not execute %} + {{ return(False) }} + {% else %} + {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} + + {{ return(relation is not none + and relation.type == 'table' + and model.config.materialized == 'pit_incremental' + and not flags.FULL_REFRESH) }} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers.sql b/macros/materialisations/period_mat_helpers.sql deleted file mode 100644 index b10ae5e46..000000000 --- a/macros/materialisations/period_mat_helpers.sql +++ /dev/null @@ -1,186 +0,0 @@ -{#-- Helper macros for period materializations #} - -{#-- MULTI-DISPATCH MACROS #} - -{#-- REPLACE_PLACEHOLDER_WITH_PERIOD_FILTER #} - -{%- macro replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) -%} - - {% set macro = adapter.dispatch('replace_placeholder_with_period_filter', - 'dbtvault')(core_sql=core_sql, - timestamp_field=timestamp_field, - start_timestamp=start_timestamp, - stop_timestamp=stop_timestamp, - offset=offset, - period=period) %} - {% do return(macro) %} -{%- endmacro %} - -{% macro default__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} - - {%- set period_filter -%} - (TO_DATE({{ timestamp_field }}) >= DATE_TRUNC('{{ period }}', TO_DATE('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}') AND - TO_DATE({{ timestamp_field }}) < DATE_TRUNC('{{ period }}', TO_DATE('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}')) - AND (TO_DATE({{ timestamp_field }}) >= TO_DATE('{{ start_timestamp }}')) - {%- endset -%} - {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} - - {% do return(filtered_sql) %} -{% endmacro %} - -{#-- GET_PERIOD_FILTER_SQL #} - -{%- macro get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} - - {% set macro = adapter.dispatch('get_period_filter_sql', - 'dbtvault')(target_cols_csv=target_cols_csv, - base_sql=base_sql, - timestamp_field=timestamp_field, - period=period, - start_timestamp=start_timestamp, - stop_timestamp=stop_timestamp, - offset=offset) %} - {% do return(macro) %} -{%- endmacro %} - -{% macro default__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} - - {%- set filtered_sql = {'sql': base_sql} -%} - - {%- do filtered_sql.update({'sql': dbtvault.replace_placeholder_with_period_filter(filtered_sql.sql, - timestamp_field, - start_timestamp, - stop_timestamp, - offset, period)}) -%} - select {{ target_cols_csv }} from ({{ filtered_sql.sql }}) -{%- endmacro %} - - -{#-- GET_PERIOD_BOUNDARIES #} - -{%- macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} - - {% set macro = adapter.dispatch('get_period_boundaries', - 'dbtvault')(target_schema=target_schema, - target_table=target_table, - timestamp_field=timestamp_field, - start_date=start_date, - stop_date=stop_date, - period=period) %} - - {% do return(macro) %} -{%- endmacro %} - - - -{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} - - {% set period_boundary_sql -%} - WITH period_data AS ( - SELECT - COALESCE(MAX({{ timestamp_field }}), '{{ start_date }}')::TIMESTAMP AS start_timestamp, - COALESCE({{ dbt_utils.dateadd('millisecond', 86399999, "NULLIF('" ~ stop_date | lower ~ "','none')::TIMESTAMP") }}, - {{ dbtvault.current_timestamp() }} ) AS stop_timestamp - FROM {{ target_schema }}.{{ target_table }} - ) - SELECT - start_timestamp, - stop_timestamp, - {{ dbt_utils.datediff('start_timestamp', - 'stop_timestamp', - period) }} + 1 AS num_periods - FROM period_data - {%- endset %} - - {% set period_boundaries_dict = dbt_utils.get_query_results_as_dict(period_boundary_sql) %} - - {% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string, - 'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string, - 'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %} - - {% do return(period_boundaries) %} -{%- endmacro %} - -{#-- GET_PERIOD_OF_LOAD #} - -{%- macro get_period_of_load(period, offset, start_timestamp) -%} - - {% set macro = adapter.dispatch('get_period_of_load', - 'dbtvault')(period=period, - offset=offset, - start_timestamp=start_timestamp) %} - - {% do return(macro) %} -{%- endmacro %} - - -{%- macro default__get_period_of_load(period, offset, start_timestamp) -%} - - {% set period_of_load_sql -%} - SELECT DATE_TRUNC('{{ period }}', DATEADD({{ period }}, {{ offset }}, TO_DATE('{{ start_timestamp }}'))) AS period_of_load - {%- endset %} - - {% set period_of_load_dict = dbt_utils.get_query_results_as_dict(period_of_load_sql) %} - - {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} - - {% do return(period_of_load) %} -{%- endmacro -%} - -{#-- OTHER MACROS #} - -{% macro is_vault_insert_by_period() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'vault_insert_by_period' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} - - -{% macro get_start_stop_dates(timestamp_field, date_source_models) %} - - {% if config.get('start_date', default=none) is not none %} - - {%- set start_date = config.get('start_date') -%} - {%- set stop_date = config.get('stop_date', default=none) -%} - - {% do return({'start_date': start_date,'stop_date': stop_date}) %} - - {% elif date_source_models is not none %} - - {% if date_source_models is string %} - {% set date_source_models = [date_source_models] %} - {% endif %} - {% set query_sql %} - WITH stage AS ( - {% for source_model in date_source_models %} - SELECT {{ timestamp_field }} FROM {{ ref(source_model) }} - {% if not loop.last %} UNION ALL {% endif %} - {% endfor %}) - - SELECT MIN({{ timestamp_field }}) AS MIN, MAX({{ timestamp_field }}) AS MAX - FROM stage - {% endset %} - - {% set min_max_dict = dbt_utils.get_query_results_as_dict(query_sql) %} - - {% set start_date = min_max_dict['MIN'][0] | string %} - {% set stop_date = min_max_dict['MAX'][0] | string %} - {% set min_max_dates = {"start_date": start_date, "stop_date": stop_date} %} - - {% do return(min_max_dates) %} - - {% else %} - {%- if execute -%} - {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_period' configuration. Must provide 'start_date' and 'stop_date', just 'stop_date', and/or 'date_source_models' options.") }} - {%- endif -%} - {% endif %} - -{% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_period_boundaries.sql b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql new file mode 100644 index 000000000..09e37853c --- /dev/null +++ b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql @@ -0,0 +1,109 @@ +{%- macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% set macro = adapter.dispatch('get_period_boundaries', + 'dbtvault')(target_schema=target_schema, + target_table=target_table, + timestamp_field=timestamp_field, + start_date=start_date, + stop_date=stop_date, + period=period) %} + + {% do return(macro) %} +{%- endmacro %} + + + +{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% set period_boundary_sql -%} + WITH period_data AS ( + SELECT + COALESCE(MAX({{ timestamp_field }}), '{{ start_date }}')::TIMESTAMP AS start_timestamp, + COALESCE({{ dbt_utils.dateadd('millisecond', 86399999, "NULLIF('" ~ stop_date | lower ~ "','none')::TIMESTAMP") }}, + {{ dbtvault.current_timestamp() }} ) AS stop_timestamp + FROM {{ target_schema }}.{{ target_table }} + ) + SELECT + start_timestamp, + stop_timestamp, + {{ dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period) }} + 1 AS num_periods + FROM period_data + {%- endset %} + + {% set period_boundaries_dict = dbtvault.get_query_results_as_dict(period_boundary_sql) %} + + {% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string, + 'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string, + 'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %} + + {% do return(period_boundaries) %} +{%- endmacro %} + + + + +{% macro bigquery__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% set period_boundary_sql -%} + with data as ( + select + coalesce(CAST(max({{ timestamp_field }}) AS DATETIME), CAST('{{ start_date }}' AS DATETIME)) as START_TIMESTAMP, + coalesce({{ dbt_utils.dateadd('millisecond', 86399999, "nullif('" ~ stop_date | lower ~ "','none')") }}, + CAST(CURRENT_TIMESTAMP() AS DATETIME) ) as STOP_TIMESTAMP + from {{ target_schema }}.{{ target_table }} + ) + select + START_TIMESTAMP, + STOP_TIMESTAMP, + {{ dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period) }} + 1 as NUM_PERIODS + from data + {%- endset %} + + + {% set period_boundaries_dict = dbt_utils.get_query_results_as_dict(period_boundary_sql) %} + + {% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string, + 'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string, + 'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %} + + {% do return(period_boundaries) %} +{%- endmacro %} + + + + +{% macro sqlserver__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_date_mssql = start_date[0:27] %} + {% set stop_date_mssql = stop_date[0:27] %} + + {% set period_boundary_sql -%} + WITH period_data AS ( + SELECT + CAST(COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date_mssql }}' AS DATETIME2)) AS DATETIME2) AS start_timestamp, + COALESCE({{ dbt_utils.dateadd('millisecond', 86399999, "CAST(NULLIF('" ~ stop_date_mssql | lower ~ "','none') AS DATETIME2)") }}, + {{ dbtvault.current_timestamp() }} ) AS stop_timestamp + FROM {{ target_schema }}.{{ target_table }} + ) + SELECT + start_timestamp, + stop_timestamp, + {{ dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period) }} + 1 AS num_periods + FROM period_data + {%- endset %} + + {% set period_boundaries_dict = dbtvault.get_query_results_as_dict(period_boundary_sql) %} + + {% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string, + 'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string, + 'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %} + + {% do return(period_boundaries) %} +{%- endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql new file mode 100644 index 000000000..245eedef6 --- /dev/null +++ b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql @@ -0,0 +1,43 @@ +{%- macro get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {% set macro = adapter.dispatch('get_period_filter_sql', + 'dbtvault')(target_cols_csv=target_cols_csv, + base_sql=base_sql, + timestamp_field=timestamp_field, + period=period, + start_timestamp=start_timestamp, + stop_timestamp=stop_timestamp, + offset=offset) %} + {% do return(macro) %} +{%- endmacro %} + + + + +{% macro default__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {%- set filtered_sql = {'sql': base_sql} -%} + + {%- do filtered_sql.update({'sql': dbtvault.replace_placeholder_with_period_filter(filtered_sql.sql, + timestamp_field, + start_timestamp, + stop_timestamp, + offset, period)}) -%} + select {{ target_cols_csv }} from ({{ filtered_sql.sql }}) +{%- endmacro %} + + + + +{% macro sqlserver__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {%- set filtered_sql = {'sql': base_sql} -%} + + {%- do filtered_sql.update({'sql': dbtvault.replace_placeholder_with_period_filter(filtered_sql.sql, + timestamp_field, + start_timestamp, + stop_timestamp, + offset, period)}) -%} + {# MSSQL does not allow CTEs in a subquery #} + {{ filtered_sql.sql }} +{%- endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_period_of_load.sql b/macros/materialisations/period_mat_helpers/get_period_of_load.sql new file mode 100644 index 000000000..99a8a1024 --- /dev/null +++ b/macros/materialisations/period_mat_helpers/get_period_of_load.sql @@ -0,0 +1,59 @@ +{%- macro get_period_of_load(period, offset, start_timestamp) -%} + + {% set macro = adapter.dispatch('get_period_of_load', + 'dbtvault')(period=period, + offset=offset, + start_timestamp=start_timestamp) %} + + {% do return(macro) %} +{%- endmacro %} + + + + +{%- macro default__get_period_of_load(period, offset, start_timestamp) -%} + + {% set period_of_load_sql -%} + SELECT DATE_TRUNC('{{ period }}', DATEADD({{ period }}, {{ offset }}, TO_DATE('{{ start_timestamp }}'))) AS period_of_load + {%- endset %} + + {% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} + + + + +{%- macro bigquery__get_period_of_load(period, offset, start_timestamp) -%} + + {% set period_of_load_sql -%} + SELECT DATE_TRUNC(DATE_ADD( DATE('{{start_timestamp}}'), INTERVAL {{ offset }} {{ period }}), {{ period }} ) AS PERIOD_OF_LOAD + {%- endset %} + + {% set period_of_load_dict = dbt_utils.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} + + + + +{%- macro sqlserver__get_period_of_load(period, offset, start_timestamp) -%} + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_timestamp_mssql = start_timestamp[0:23] %} + + {% set period_of_load_sql -%} + SELECT DATEADD({{ period }}, DATEDIFF({{period}}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) AS period_of_load + {%- endset %} + + {% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_start_stop_dates.sql b/macros/materialisations/period_mat_helpers/get_start_stop_dates.sql new file mode 100644 index 000000000..016afe46d --- /dev/null +++ b/macros/materialisations/period_mat_helpers/get_start_stop_dates.sql @@ -0,0 +1,40 @@ +{% macro get_start_stop_dates(timestamp_field, date_source_models) %} + + {% if config.get('start_date', default=none) is not none %} + + {%- set start_date = config.get('start_date') -%} + {%- set stop_date = config.get('stop_date', default=none) -%} + + {% do return({'start_date': start_date,'stop_date': stop_date}) %} + + {% elif date_source_models is not none %} + + {% if date_source_models is string %} + {% set date_source_models = [date_source_models] %} + {% endif %} + {% set query_sql %} + WITH stage AS ( + {% for source_model in date_source_models %} + SELECT {{ timestamp_field }} FROM {{ ref(source_model) }} + {% if not loop.last %} UNION ALL {% endif %} + {% endfor %}) + + SELECT MIN({{ timestamp_field }}) AS MIN, MAX({{ timestamp_field }}) AS MAX + FROM stage + {% endset %} + + {% set min_max_dict = dbtvault.get_query_results_as_dict(query_sql) %} + + {% set start_date = min_max_dict['MIN'][0] | string %} + {% set stop_date = min_max_dict['MAX'][0] | string %} + {% set min_max_dates = {"start_date": start_date, "stop_date": stop_date} %} + + {% do return(min_max_dates) %} + + {% else %} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_period' configuration. Must provide 'start_date' and 'stop_date', just 'stop_date', and/or 'date_source_models' options.") }} + {%- endif -%} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql new file mode 100644 index 000000000..216bc106a --- /dev/null +++ b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql @@ -0,0 +1,62 @@ +{%- macro replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) -%} + + {% set macro = adapter.dispatch('replace_placeholder_with_period_filter', + 'dbtvault')(core_sql=core_sql, + timestamp_field=timestamp_field, + start_timestamp=start_timestamp, + stop_timestamp=stop_timestamp, + offset=offset, + period=period) %} + {% do return(macro) %} +{%- endmacro %} + + + + +{% macro default__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} + + {%- set period_filter -%} + (TO_DATE({{ timestamp_field }}) + >= DATE_TRUNC('{{ period }}', TO_DATE('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}') AND + TO_DATE({{ timestamp_field }}) < DATE_TRUNC('{{ period }}', TO_DATE('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}' + INTERVAL '1 {{ period }}')) + AND (TO_DATE({{ timestamp_field }}) >= TO_DATE('{{ start_timestamp }}')) + {%- endset -%} + {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + + + +{% macro bigquery__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} + + {%- set period_filter -%} + (DATE({{ timestamp_field }}) >= DATE_TRUNC(DATE_ADD( DATE('{{ start_timestamp }}'), INTERVAL {{ offset }} {{ period }}), {{ period }} ) AND + DATE({{ timestamp_field }}) < DATE_TRUNC(DATE_ADD(DATE_ADD( DATE('{{ start_timestamp }}'), INTERVAL {{ offset }} {{ period }}), INTERVAL 1 {{ period }}), {{ period }} ) + AND DATE({{ timestamp_field }}) >= DATE('{{ start_timestamp }}')) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + + + +{% macro sqlserver__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} + + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_timestamp_mssql = start_timestamp[0:27] %} + + {%- set period_filter -%} + (CAST({{ timestamp_field }} AS DATE) >= DATEADD({{ period }}, DATEDIFF({{ period }}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) AND + CAST({{ timestamp_field }} AS DATE) < DATEADD({{ period }}, 1, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))) + AND (CAST({{ timestamp_field }} AS DATE) >= CAST('{{ start_timestamp_mssql }}' AS DATE))) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} diff --git a/macros/materialisations/rank_mat_helpers.sql b/macros/materialisations/rank_mat_helpers.sql deleted file mode 100644 index 1aaac7b34..000000000 --- a/macros/materialisations/rank_mat_helpers.sql +++ /dev/null @@ -1,77 +0,0 @@ -{#-- Helper macros for rank materializations #} - -{#-- MULTI-DISPATCH MACROS #} - -{#-- REPLACE_PLACEHOLDER_WITH_RANK_FILTER #} - -{%- macro replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) -%} - - {% set macro = adapter.dispatch('replace_placeholder_with_rank_filter', - 'dbtvault')(core_sql=core_sql, - rank_column=rank_column, - rank_iteration=rank_iteration) %} - {% do return(macro) %} -{%- endmacro %} - -{% macro default__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} - - {%- set rank_filter -%} - {{ rank_column }}::INTEGER = {{ rank_iteration }}::INTEGER - {%- endset -%} - - {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} - - {% do return(filtered_sql) %} -{% endmacro %} - -{#-- OTHER MACROS #} - -{% macro get_min_max_ranks(rank_column, rank_source_models) %} - - {% if rank_source_models is not none %} - - {% if rank_source_models is string %} - {% set rank_source_models = [rank_source_models] %} - {% endif %} - - {% set query_sql %} - WITH stage AS ( - {% for source_model in rank_source_models %} - SELECT {{ rank_column }} FROM {{ ref(source_model) }} - {% if not loop.last %} UNION ALL {% endif %} - {% endfor %}) - - SELECT MIN({{ rank_column }}) AS MIN, MAX({{ rank_column }}) AS MAX - FROM stage - {% endset %} - - {% set min_max_dict = dbt_utils.get_query_results_as_dict(query_sql) %} - - {% set min_rank = min_max_dict['MIN'][0] | string %} - {% set max_rank = min_max_dict['MAX'][0] | string %} - {% set min_max_ranks = {"min_rank": min_rank, "max_rank": max_rank} %} - - {% do return(min_max_ranks) %} - - {% else %} - {%- if execute -%} - {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_rank' configuration. Must provide 'rank_column', and 'rank_source_models' options.") }} - {%- endif -%} - {% endif %} - -{% endmacro %} - - -{% macro is_vault_insert_by_rank() %} - {#-- do not run introspective queries in parsing #} - {% if not execute %} - {{ return(False) }} - {% else %} - {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} - - {{ return(relation is not none - and relation.type == 'table' - and model.config.materialized == 'vault_insert_by_rank' - and not flags.FULL_REFRESH) }} - {% endif %} -{% endmacro %} diff --git a/macros/materialisations/rank_mat_helpers/get_min_max_ranks.sql b/macros/materialisations/rank_mat_helpers/get_min_max_ranks.sql new file mode 100644 index 000000000..7c5abba3a --- /dev/null +++ b/macros/materialisations/rank_mat_helpers/get_min_max_ranks.sql @@ -0,0 +1,34 @@ +{% macro get_min_max_ranks(rank_column, rank_source_models) %} + + {% if rank_source_models is not none %} + + {% if rank_source_models is string %} + {% set rank_source_models = [rank_source_models] %} + {% endif %} + + {% set query_sql %} + WITH stage AS ( + {% for source_model in rank_source_models %} + SELECT {{ rank_column }} FROM {{ ref(source_model) }} + {% if not loop.last %} UNION ALL {% endif %} + {% endfor %}) + + SELECT MIN({{ rank_column }}) AS MIN, MAX({{ rank_column }}) AS MAX + FROM stage + {% endset %} + + {% set min_max_dict = dbtvault.get_query_results_as_dict(query_sql) %} + + {% set min_rank = min_max_dict['MIN'][0] | string %} + {% set max_rank = min_max_dict['MAX'][0] | string %} + {% set min_max_ranks = {"min_rank": min_rank, "max_rank": max_rank} %} + + {% do return(min_max_ranks) %} + + {% else %} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_rank' configuration. Must provide 'rank_column', and 'rank_source_models' options.") }} + {%- endif -%} + {% endif %} + +{% endmacro %} diff --git a/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql b/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql new file mode 100644 index 000000000..05e4ffbbc --- /dev/null +++ b/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql @@ -0,0 +1,46 @@ +{%- macro replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) -%} + + {% set macro = adapter.dispatch('replace_placeholder_with_rank_filter', + 'dbtvault')(core_sql=core_sql, + rank_column=rank_column, + rank_iteration=rank_iteration) %} + {% do return(macro) %} + {%- endmacro %} + + {% macro default__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} + + {%- set rank_filter -%} + {{ rank_column }}:: INTEGER = {{ rank_iteration }}::INTEGER + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + + + +{% macro sqlserver__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} + + {%- set rank_filter -%} + CAST({{ rank_column }} AS INT) = CAST({{ rank_iteration }} AS INT) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + + + +{% macro bigquery__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} + {%- set rank_filter -%} + CAST({{ rank_column }} AS INTEGER) = CAST({{ rank_iteration }} AS INTEGER) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/shared_helpers.sql b/macros/materialisations/shared_helpers.sql index 2e65940c8..19ca0582c 100644 --- a/macros/materialisations/shared_helpers.sql +++ b/macros/materialisations/shared_helpers.sql @@ -1,19 +1,10 @@ {%- macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') -%} {%- if model_sql.find(placeholder) == -1 -%} - {%- set error_message -%} - Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql + {%- set error_message -%} + Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql {%- endset -%} {{- exceptions.raise_compiler_error(error_message) -}} {%- endif -%} -{%- endmacro -%} - - -{%- macro is_any_incremental() -%} - {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or dbtvault.is_pit_incremental() or dbtvault.is_bridge_incremental() or is_incremental() -%} - {%- do return(true) -%} - {%- else -%} - {%- do return(false) -%} - {%- endif -%} {%- endmacro -%} \ No newline at end of file diff --git a/macros/materialisations/vault_insert_by_period_materialization.sql b/macros/materialisations/vault_insert_by_period_materialization.sql index 8a8d98988..29576da90 100644 --- a/macros/materialisations/vault_insert_by_period_materialization.sql +++ b/macros/materialisations/vault_insert_by_period_materialization.sql @@ -1,12 +1,16 @@ {% materialization vault_insert_by_period, default -%} - {%- set full_refresh_mode = flags.FULL_REFRESH -%} + {%- set full_refresh_mode = (should_full_refresh()) -%} - {%- set target_relation = this -%} + {% if target.type == "sqlserver" %} + {%- set target_relation = this.incorporate(type='table') -%} + {% else %} + {%- set target_relation = this -%} + {% endif %} {%- set existing_relation = load_relation(this) -%} - {%- set tmp_relation = make_temp_relation(this) -%} + {%- set tmp_relation = make_temp_relation(target_relation) -%} - {%- set timestamp_field = config.require('timestamp_field') -%} + {%- set timestamp_field = dbtvault.escape_column_names(config.require('timestamp_field')) -%} {%- set date_source_models = config.get('date_source_models', default=none) -%} {%- set start_stop_dates = dbtvault.get_start_stop_dates(timestamp_field, date_source_models) | as_native -%} @@ -30,13 +34,11 @@ {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} {% do to_drop.append(tmp_relation) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% elif existing_relation.is_view %} - {% do adapter.drop_relation(backup_relation) %} - {% do adapter.rename_relation(target_relation, backup_relation) %} + {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table (vault_insert_by_period).") }} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, start_stop_dates.start_date, @@ -44,8 +46,12 @@ 0, period) %} {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} - {% do to_drop.append(tmp_relation) %} - {% do to_drop.append(backup_relation) %} + {% elif full_refresh_mode %} + {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, + start_stop_dates.start_date, + start_stop_dates.stop_date, + 0, period) %} + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} {% else %} {% set period_boundaries = dbtvault.get_period_boundaries(schema, @@ -66,7 +72,8 @@ {{ dbt_utils.log_info("Running for {} {} of {} ({}) [{}]".format(period, iteration_number, period_boundaries.num_periods, period_of_load, model.unique_id)) }} - {% set tmp_relation = make_temp_relation(this) %} + {% set tmp_relation = make_temp_relation(target_relation) %} + {% set tmp_table_sql = dbtvault.get_period_filter_sql(target_cols_csv, sql, timestamp_field, period, period_boundaries.start_timestamp, period_boundaries.stop_timestamp, i) %} @@ -146,6 +153,8 @@ {% endif %} {% endfor %} + {% set target_relation = target_relation.incorporate(type='table') %} + {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} diff --git a/macros/materialisations/vault_insert_by_rank_materialization.sql b/macros/materialisations/vault_insert_by_rank_materialization.sql index ae20aeeb0..5c4730ca0 100644 --- a/macros/materialisations/vault_insert_by_rank_materialization.sql +++ b/macros/materialisations/vault_insert_by_rank_materialization.sql @@ -1,12 +1,16 @@ {% materialization vault_insert_by_rank, default -%} - {%- set full_refresh_mode = flags.FULL_REFRESH -%} + {%- set full_refresh_mode = (should_full_refresh()) -%} - {%- set target_relation = this -%} + {% if target.type == "sqlserver" %} + {%- set target_relation = this.incorporate(type='table') -%} + {% else %} + {%- set target_relation = this -%} + {% endif %} {%- set existing_relation = load_relation(this) -%} - {%- set tmp_relation = make_temp_relation(this) -%} + {%- set tmp_relation = make_temp_relation(target_relation) -%} - {%- set rank_column = config.require('rank_column') -%} + {%- set rank_column = dbtvault.escape_column_names(config.require('rank_column')) -%} {%- set rank_source_models = config.require('rank_source_models') -%} {%- set min_max_ranks = dbtvault.get_min_max_ranks(rank_column, rank_source_models) | as_native -%} @@ -27,19 +31,24 @@ {% do to_drop.append(tmp_relation) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% elif existing_relation.is_view %} - {% do adapter.drop_relation(backup_relation) %} - {% do adapter.rename_relation(target_relation, backup_relation) %} + {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table (vault_insert_by_rank).") }} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} - {% set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, 1) %} + {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, + start_stop_dates.start_date, + start_stop_dates.stop_date, + 0, period) %} {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} - {% do to_drop.append(tmp_relation) %} - {% do to_drop.append(backup_relation) %} + {% elif full_refresh_mode %} + {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, + start_stop_dates.start_date, + start_stop_dates.stop_date, + 0, period) %} + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} {% else %} {% set target_columns = adapter.get_columns_in_relation(target_relation) %} @@ -54,7 +63,7 @@ {{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }} - {% set tmp_relation = make_temp_relation(this) %} + {% set tmp_relation = make_temp_relation(target_relation) %} {# This call statement drops and then creates a temporary table #} {# but MSSQL will fail to drop any temporary table created by a previous loop iteration #} @@ -130,6 +139,8 @@ {% endif %} {% endfor %} + {% set target_relation = target_relation.incorporate(type='table') %} + {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} diff --git a/macros/staging/derive_columns.sql b/macros/staging/derive_columns.sql index 236fcc5e4..efbce1392 100644 --- a/macros/staging/derive_columns.sql +++ b/macros/staging/derive_columns.sql @@ -17,6 +17,7 @@ {#- Add aliases of derived columns to excludes and full SQL to includes -#} {%- for col in columns -%} + {%- if dbtvault.is_list(columns[col]) -%} {%- set column_list = [] -%} @@ -24,6 +25,8 @@ {%- set column_str = dbtvault.as_constant(concat_component) -%} {%- do column_list.append(column_str) -%} {%- endfor -%} + {%- set concat = dbtvault.concat_ws(column_list, "||") -%} + {%- set concat_string = concat ~ " AS " ~ dbtvault.escape_column_names(col) -%} {% set concat_string = "CONCAT_WS(" ~ "'||', " ~ column_list | join(", ") ~ ") AS " ~ col %} @@ -31,7 +34,7 @@ {%- set exclude_columns = exclude_columns + columns[col] -%} {% else %} {%- set column_str = dbtvault.as_constant(columns[col]) -%} - {%- do der_columns.append(column_str ~ " AS " ~ col) -%} + {%- do der_columns.append(column_str ~ " AS " ~ dbtvault.escape_column_names(col)) -%} {%- do exclude_columns.append(col) -%} {% endif %} @@ -42,13 +45,13 @@ {%- for col in source_cols -%} {%- if col not in exclude_columns -%} - {%- do src_columns.append(col) -%} + {%- do src_columns.append(dbtvault.escape_column_names(col)) -%} {%- endif -%} {%- endfor -%} {%- endif -%} - {#- Makes sure the columns are appended in a logical order. Derived columns then source columns -#} + {#- Makes sure the columns are appended in a logical order. Source columns then derived columns -#} {%- set include_columns = src_columns + der_columns -%} {#- Print out all columns in includes -#} diff --git a/macros/staging/rank_columns.sql b/macros/staging/rank_columns.sql index 338964182..642a1035c 100644 --- a/macros/staging/rank_columns.sql +++ b/macros/staging/rank_columns.sql @@ -34,9 +34,9 @@ {%- if order_by_col is mapping %} {%- set column_name, direction = order_by_col.items()|first -%} - {%- set order_by_str = "{} {}".format(column_name, direction) | trim -%} + {%- set order_by_str = "{} {}".format(dbtvault.escape_column_names(column_name), direction) | trim -%} {%- else -%} - {%- set order_by_str = order_by_col -%} + {%- set order_by_str = dbtvault.escape_column_names(order_by_col) -%} {%- endif -%} {%- do order_by_str_lst.append(order_by_str) -%} @@ -53,16 +53,16 @@ {%- set direction = '' -%} {%- endif -%} - {%- set order_by_str = "{} {}".format(column_name, direction) | trim -%} + {%- set order_by_str = "{} {}".format(dbtvault.escape_column_names(column_name), direction) | trim -%} {%- endif -%} {%- if dbtvault.is_list(partition_by) -%} - {%- set partition_by_str = partition_by | join(", ") -%} + {%- set partition_by_str = dbtvault.escape_column_names(partition_by) | join(", ") -%} {%- else -%} - {%- set partition_by_str = partition_by -%} + {%- set partition_by_str = dbtvault.escape_column_names(partition_by) -%} {%- endif -%} - {{- "{} OVER (PARTITION BY {} ORDER BY {}) AS {}".format(rank_type, partition_by_str, order_by_str, col) | indent(4) -}} + {{- "{} OVER (PARTITION BY {} ORDER BY {}) AS {}".format(rank_type, partition_by_str, order_by_str, dbtvault.escape_column_names(col)) | indent(4) -}} {%- endif -%} diff --git a/macros/staging/stage.sql b/macros/staging/stage.sql index 06c6b85d1..1206e798e 100644 --- a/macros/staging/stage.sql +++ b/macros/staging/stage.sql @@ -63,10 +63,10 @@ {%- if dbtvault.is_nothing(derived_columns) and dbtvault.is_nothing(hashed_columns) and dbtvault.is_nothing(ranked_columns) -%} - {%- set final_columns_to_select = final_columns_to_select + all_source_columns -%} + {%- set final_columns_to_select = final_columns_to_select + dbtvault.escape_column_names(all_source_columns) -%} {%- else -%} {#- Only include non-overriden columns if not just source columns -#} - {%- set final_columns_to_select = final_columns_to_select + source_columns_to_select -%} + {%- set final_columns_to_select = final_columns_to_select + dbtvault.escape_column_names(source_columns_to_select) -%} {%- endif -%} {%- endif %} @@ -74,7 +74,7 @@ WITH source_data AS ( SELECT - {{- "\n\n " ~ dbtvault.print_list(all_source_columns) if all_source_columns else " *" }} + {{- "\n\n " ~ dbtvault.print_list(dbtvault.escape_column_names(all_source_columns)) if all_source_columns else " *" }} FROM {{ source_relation }} {%- set last_cte = "source_data" %} @@ -90,7 +90,7 @@ derived_columns AS ( FROM {{ last_cte }} {%- set last_cte = "derived_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + derived_column_names %} + {%- set final_columns_to_select = final_columns_to_select + dbtvault.escape_column_names(derived_column_names) %} ) {%- endif -%} @@ -100,14 +100,14 @@ hashed_columns AS ( SELECT - {{ dbtvault.print_list(derived_columns_to_select) }}, + {{ dbtvault.print_list(dbtvault.escape_column_names(derived_columns_to_select)) }}, {% set processed_hash_columns = dbtvault.process_hash_column_excludes(hashed_columns, all_source_columns) -%} {{- dbtvault.hash_columns(columns=processed_hash_columns) | indent(4) }} FROM {{ last_cte }} {%- set last_cte = "hashed_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + hashed_column_names %} + {%- set final_columns_to_select = final_columns_to_select + dbtvault.escape_column_names(hashed_column_names) %} ) {%- endif -%} @@ -121,7 +121,7 @@ ranked_columns AS ( FROM {{ last_cte }} {%- set last_cte = "ranked_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + ranked_column_names %} + {%- set final_columns_to_select = final_columns_to_select + dbtvault.escape_column_names(ranked_column_names) %} ) {%- endif -%} diff --git a/macros/supporting/current_timestamp.sql b/macros/supporting/current_timestamp.sql index 46642cfbb..4e89806d9 100644 --- a/macros/supporting/current_timestamp.sql +++ b/macros/supporting/current_timestamp.sql @@ -6,6 +6,9 @@ {{ dbt_utils.current_timestamp() }} {% endmacro %} +{% macro sqlserver__current_timestamp() %} + sysdatetime() +{% endmacro %} {% macro current_timestamp_in_utc() -%} {{ return(adapter.dispatch('current_timestamp_in_utc', 'dbtvault')()) }} @@ -13,4 +16,8 @@ {% macro default__current_timestamp_in_utc() %} {{dbt_utils.current_timestamp_in_utc()}} +{% endmacro %} + +{% macro sqlserver__current_timestamp_in_utc() %} + sysutcdatetime() {% endmacro %} \ No newline at end of file diff --git a/macros/supporting/datatypes.sql b/macros/supporting/datatypes.sql index b9aad1c85..ec4c458b6 100644 --- a/macros/supporting/datatypes.sql +++ b/macros/supporting/datatypes.sql @@ -2,6 +2,10 @@ {{ return(adapter.dispatch('type_timestamp', 'dbtvault')()) }} {%- endmacro -%} -{% macro default__type_timestamp() %} +{%- macro default__type_timestamp() -%} {{ dbt_utils.type_timestamp() }} -{% endmacro %} \ No newline at end of file +{%- endmacro -%} + +{%- macro sqlserver__type_timestamp() -%} + datetime2 +{%- endmacro -%} diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index ffc570489..3c9caeb8a 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -36,7 +36,12 @@ {#- If single column to hash -#} {%- if columns is string -%} {%- set column_str = dbtvault.as_constant(columns) -%} - {{- "CAST(({}({})) AS BINARY({})) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', column_str), hash_size, alias) | indent(4) -}} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "CAST(({}({})) AS BINARY({})) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', escaped_column_str), hash_size, dbtvault.escape_column_names(alias)) | indent(4) -}} {#- Else a list of columns to hash -#} {%- else -%} @@ -53,15 +58,168 @@ {%- do all_null.append(null_placeholder_string) -%} {%- set column_str = dbtvault.as_constant(column) -%} - {{- "\nIFNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', column_str), null_placeholder_string) | indent(4) -}} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "\nIFNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}} + {{- "," if not loop.last -}} + + {%- if loop.last -%} + + {% if is_hashdiff %} + {{- "\n)) AS BINARY({})) AS {}".format(hash_size, dbtvault.escape_column_names(alias)) -}} + {%- else -%} + {{- "\n), '{}')) AS BINARY({})) AS {}".format(all_null | join(""), hash_size, dbtvault.escape_column_names(alias)) -}} + {%- endif -%} + {%- else -%} + + {%- do all_null.append(concat_string) -%} + + {%- endif -%} + + {%- endfor -%} + +{%- endif -%} + +{%- endmacro -%} + +{%- macro bigquery__hash(columns, alias, is_hashdiff) -%} + +{%- set hash = var('hash', 'MD5') -%} +{%- set concat_string = var('concat_string', '||') -%} +{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%} + +{#- Select hashing algorithm -#} + +{%- if hash == 'MD5' -%} + {%- set hash_alg = 'MD5' -%} +{%- elif hash == 'SHA' -%} + {%- set hash_alg = 'SHA256' -%} +{%- endif -%} + +{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS STRING))), '')" %} + +{#- Alpha sort columns before hashing if a hashdiff -#} +{%- if is_hashdiff and dbtvault.is_list(columns) -%} + {%- set columns = columns|sort -%} +{%- endif -%} + +{#- If single column to hash -#} +{%- if columns is string -%} + {%- set column_str = dbtvault.as_constant(columns) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "CAST(UPPER(TO_HEX({}({}))) AS STRING) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', escaped_column_str), dbtvault.escape_column_names(alias)) | indent(4) -}} + +{#- Else a list of columns to hash -#} +{%- else -%} + {%- set all_null = [] -%} + {%- if is_hashdiff -%} + {{- "UPPER(TO_HEX({}(CONCAT(".format(hash_alg) | indent(4) -}} + + {%- else -%} + {{- "UPPER(TO_HEX({}(NULLIF(CONCAT(".format(hash_alg) | indent(4) -}} + {%- endif -%} + + {%- for column in columns -%} + + {%- do all_null.append(null_placeholder_string) -%} + + {%- set column_str = dbtvault.as_constant(column) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "\nIFNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}} + {{- ",'{}',".format(concat_string) if not loop.last -}} + {%- if loop.last -%} + + {% if is_hashdiff %} + {{- "\n)))) AS {}".format(dbtvault.escape_column_names(alias)) -}} + {%- else -%} + {{- "\n), '{}')))) AS {}".format(all_null | join(""), dbtvault.escape_column_names(alias)) -}} + {%- endif -%} + {%- else -%} + + {%- do all_null.append(concat_string) -%} + {%- endif -%} + + {%- endfor -%} + +{%- endif -%} + +{%- endmacro -%} + +{%- macro sqlserver__hash(columns, alias, is_hashdiff) -%} + +{%- set hash = var('hash', 'MD5') -%} +{%- set concat_string = var('concat_string', '||') -%} +{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%} + +{#- Select hashing algorithm -#} +{%- if hash == 'MD5' -%} + {%- set hash_alg = 'MD5' -%} + {%- set hash_size = 16 -%} +{%- elif hash == 'SHA' -%} + {%- set hash_alg = 'SHA2_256' -%} + {%- set hash_size = 32 -%} +{%- else -%} + {%- set hash_alg = 'MD5' -%} + {%- set hash_size = 16 -%} +{%- endif -%} + +{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR(max)))), '')" %} + +{#- Alpha sort columns before hashing if a hashdiff -#} +{%- if is_hashdiff and dbtvault.is_list(columns) -%} + {%- set columns = columns|sort -%} +{%- endif -%} + +{#- If single column to hash -#} +{%- if columns is string -%} + {%- set column_str = dbtvault.as_constant(columns) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "CAST(HASHBYTES('{}', {}) AS BINARY({})) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', escaped_column_str), hash_size, dbtvault.escape_column_names(alias)) | indent(4) -}} + +{#- Else a list of columns to hash -#} +{%- else -%} + {%- set all_null = [] -%} + + {%- if is_hashdiff -%} + {{- "CAST(HASHBYTES('{}', (CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}} + {%- else -%} + {{- "CAST(HASHBYTES('{}', (NULLIF(CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}} + {%- endif -%} + + {%- for column in columns -%} + + {%- do all_null.append(null_placeholder_string) -%} + + {%- set column_str = dbtvault.as_constant(column) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + {{- "\nISNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}} {{- "," if not loop.last -}} {%- if loop.last -%} {% if is_hashdiff %} - {{- "\n)) AS BINARY({})) AS {}".format(hash_size, alias) -}} + {{- "\n))) AS BINARY({})) AS {}".format(hash_size, dbtvault.escape_column_names(alias)) -}} {%- else -%} - {{- "\n), '{}')) AS BINARY({})) AS {}".format(all_null | join(""), hash_size, alias) -}} + {{- "\n), '{}'))) AS BINARY({})) AS {}".format(all_null | join(""), hash_size, dbtvault.escape_column_names(alias)) -}} {%- endif -%} {%- else -%} diff --git a/macros/supporting/max_datetime.sql b/macros/supporting/max_datetime.sql index fcb13ac75..2b1791c9a 100644 --- a/macros/supporting/max_datetime.sql +++ b/macros/supporting/max_datetime.sql @@ -8,4 +8,16 @@ {% do return('9999-12-31 23:59:59.999999') %} +{% endmacro -%} + +{%- macro sqlserver__max_datetime() %} + + {% do return('9999-12-31 23:59:59.9999999') %} + +{% endmacro -%} + +{%- macro bigquery__max_datetime() %} + + {% do return('9999-12-31 23:59:59.999') %} + {% endmacro -%} \ No newline at end of file diff --git a/macros/tables/bigquery/hub.sql b/macros/tables/bigquery/hub.sql new file mode 100644 index 000000000..9eb823a70 --- /dev/null +++ b/macros/tables/bigquery/hub.sql @@ -0,0 +1,102 @@ +{%- macro bigquery__hub(src_pk, src_nk, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_nk=src_nk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_nk = dbtvault.escape_column_names(src_nk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_ldts, src_source]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + + row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + QUALIFY row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +), + +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} +{%- if source_model | length > 1 %} + + row_rank_union AS ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + QUALIFY row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), + +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/bigquery/link.sql b/macros/tables/bigquery/link.sql new file mode 100644 index 000000000..486637c35 --- /dev/null +++ b/macros/tables/bigquery/link.sql @@ -0,0 +1,102 @@ +{%- macro bigquery__link(src_pk, src_fk, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_fk=src_fk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_ldts, src_source]) -%} +{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref (src) }} AS rr + {%- if source_model | length == 1 %} + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition ='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition ='IS NOT NULL') }} + QUALIFY row_number = 1 + {%- endif %} + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} + ), + +{% endfor -%} + +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if source_model | length > 1 %} + +row_rank_union AS ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + QUALIFY row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/bigquery/sat.sql b/macros/tables/bigquery/sat.sql new file mode 100644 index 000000000..dc72448a1 --- /dev/null +++ b/macros/tables/bigquery/sat.sql @@ -0,0 +1,74 @@ +{%- macro bigquery__sat(src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_hashdiff = dbtvault.escape_column_names(src_hashdiff) -%} +{%- set src_payload = dbtvault.escape_column_names(src_payload) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +WITH source_data AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + {%- endif %} + FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} + {%- if model.config.materialized == 'vault_insert_by_period' %} + AND __PERIOD_FILTER__ + {% elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {% endif %} +), + +{% if dbtvault.is_any_incremental() %} + +latest_records AS ( + + SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} + FROM ( + SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + RANK() OVER ( + PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + ) AS rank + FROM {{ this }} AS current_records + JOIN ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + FROM source_data + ) AS source_records + ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} + ) AS a + WHERE a.rank = 1 +), + +{%- endif %} + +records_to_insert AS ( + SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN latest_records + ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/bridge.sql b/macros/tables/snowflake/bridge.sql similarity index 63% rename from macros/tables/bridge.sql rename to macros/tables/snowflake/bridge.sql index e5a694885..0cc71add7 100644 --- a/macros/tables/bridge.sql +++ b/macros/tables/snowflake/bridge.sql @@ -14,6 +14,9 @@ stage_tables_ldts=stage_tables_ldts, src_ldts=src_ldts) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} + {{ dbtvault.prepend_generated_by() }} {%- if (as_of_dates_table is none) and execute -%} @@ -133,14 +136,14 @@ overlap AS ( {{ dbtvault.prefix([src_pk], 'a') }}, b.AS_OF_DATE {%- for bridge_step in bridge_walk.keys() -%} - {%- set link_table = bridge_walk[bridge_step]['link_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {%- set eff_sat_table = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} - {%- set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] -%} - {%- set eff_sat_end_date = bridge_walk[bridge_step]['eff_sat_end_date'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} + {%- set link_table = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_table']) -%} + {%- set link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_pk']) -%} + {%- set bridge_link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_link_pk']) -%} + {%- set eff_sat_table = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_table']) -%} + {%- set bridge_end_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_end_date']) -%} + {%- set bridge_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_load_date']) -%} + {%- set eff_sat_end_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_end_date']) -%} + {%- set eff_sat_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_load_date']) -%} {%- filter indent(width=8) %} {{ ',' ~ link_table ~ '.' ~ link_pk ~ ' AS ' ~ bridge_link_pk }} {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_end_date ~ ' AS ' ~ bridge_end_date }} @@ -154,21 +157,21 @@ overlap AS ( {%- for bridge_step in bridge_walk.keys() -%} {%- set current_link = bridge_walk[bridge_step]['link_table'] -%} {%- set current_eff_sat = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set link_fk1 = bridge_walk[bridge_step]['link_fk1'] -%} - {%- set link_fk2 = bridge_walk[bridge_step]['link_fk2'] -%} - {%- set eff_sat_pk = bridge_walk[bridge_step]['eff_sat_pk'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} + {%- set link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_pk']) -%} + {%- set link_fk1 = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_fk1']) -%} + {%- set link_fk2 = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_fk2']) -%} + {%- set eff_sat_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_pk']) -%} + {%- set eff_sat_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_load_date']) -%} {%- if loop.first %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON a.{{ src_pk }} = {{ current_link }}.{{ link_fk1 }} + LEFT JOIN {{ ref(current_link) }} AS {{ dbtvault.escape_column_names(current_link) }} + ON a.{{ src_pk }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_fk1 }} {%- else %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ current_link }}.{{ link_fk1 }} + LEFT JOIN {{ ref(current_link) }} AS {{ dbtvault.escape_column_names(current_link) }} + ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_fk1 }} {%- endif %} - INNER JOIN {{ ref(current_eff_sat) }} AS {{ current_eff_sat }} - ON {{ current_eff_sat }}.{{ eff_sat_pk }} = {{ current_link }}.{{ link_pk }} - AND {{ current_eff_sat }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE + INNER JOIN {{ ref(current_eff_sat) }} AS {{ dbtvault.escape_column_names(current_eff_sat) }} + ON {{ dbtvault.escape_column_names(current_eff_sat) }}.{{ eff_sat_pk }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_pk }} + AND {{ dbtvault.escape_column_names(current_eff_sat) }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE {%- set loop_vars.last_link = current_link -%} {%- set loop_vars.last_link_fk2 = link_fk2 -%} {% endfor %} @@ -180,14 +183,14 @@ new_rows AS ( {{ dbtvault.prefix([src_pk], 'a') }}, b.AS_OF_DATE {%- for bridge_step in bridge_walk.keys() -%} - {%- set link_table = bridge_walk[bridge_step]['link_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} - {%- set eff_sat_table = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} - {%- set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] -%} - {%- set eff_sat_end_date = bridge_walk[bridge_step]['eff_sat_end_date'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} + {%- set link_table = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_table']) -%} + {%- set link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_pk']) -%} + {%- set bridge_link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_link_pk']) -%} + {%- set eff_sat_table = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_table']) -%} + {%- set bridge_end_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_end_date']) -%} + {%- set bridge_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_load_date']) -%} + {%- set eff_sat_end_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_end_date']) -%} + {%- set eff_sat_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_load_date']) -%} {%- filter indent(width=8) -%} {{ ',' ~ link_table ~'.'~ link_pk ~' AS '~ bridge_link_pk }} {{ ',' ~ eff_sat_table ~ '.' ~ eff_sat_end_date ~ ' AS ' ~ bridge_end_date }} @@ -201,22 +204,22 @@ new_rows AS ( {%- for bridge_step in bridge_walk.keys() -%} {%- set current_link = bridge_walk[bridge_step]['link_table'] -%} {%- set current_eff_sat = bridge_walk[bridge_step]['eff_sat_table'] -%} - {%- set link_pk = bridge_walk[bridge_step]['link_pk'] -%} - {%- set link_fk1 = bridge_walk[bridge_step]['link_fk1'] -%} - {%- set link_fk2 = bridge_walk[bridge_step]['link_fk2'] -%} - {%- set eff_sat_pk = bridge_walk[bridge_step]['eff_sat_pk'] -%} - {%- set eff_sat_load_date = bridge_walk[bridge_step]['eff_sat_load_date'] -%} + {%- set link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_pk']) -%} + {%- set link_fk1 = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_fk1']) -%} + {%- set link_fk2 = dbtvault.escape_column_names(bridge_walk[bridge_step]['link_fk2']) -%} + {%- set eff_sat_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_pk']) -%} + {%- set eff_sat_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['eff_sat_load_date']) -%} {%- if loop.first %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON a.{{ src_pk }} = {{ current_link }}.{{ link_fk1 }} + LEFT JOIN {{ ref(current_link) }} AS {{ dbtvault.escape_column_names(current_link) }} + ON a.{{ src_pk }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_fk1 }} {%- else %} - LEFT JOIN {{ ref(current_link) }} AS {{ current_link }} - ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ current_link }}.{{ link_fk1 }} + LEFT JOIN {{ ref(current_link) }} AS {{ dbtvault.escape_column_names(current_link) }} + ON {{ loop_vars.last_link }}.{{ loop_vars.last_link_fk2 }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_fk1 }} {%- endif %} - INNER JOIN {{ ref(current_eff_sat) }} AS {{ current_eff_sat }} - ON {{ current_eff_sat }}.{{ eff_sat_pk }} = {{ current_link }}.{{ link_pk }} - AND {{ current_eff_sat }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE - {%- set loop_vars.last_link = current_link -%} + INNER JOIN {{ ref(current_eff_sat) }} AS {{ dbtvault.escape_column_names(current_eff_sat) }} + ON {{ dbtvault.escape_column_names(current_eff_sat) }}.{{ eff_sat_pk }} = {{ dbtvault.escape_column_names(current_link) }}.{{ link_pk }} + AND {{ dbtvault.escape_column_names(current_eff_sat) }}.{{ eff_sat_load_date }} <= b.AS_OF_DATE + {%- set loop_vars.last_link = dbtvault.escape_column_names(current_link) -%} {%- set loop_vars.last_link_fk2 = link_fk2 -%} {% endfor %} ), @@ -236,7 +239,7 @@ candidate_rows AS ( ROW_NUMBER() OVER ( PARTITION BY AS_OF_DATE, {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} + {% set bridge_link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_link_pk']) -%} {%- if loop.first %} {{ bridge_link_pk }} {%- else %} @@ -245,7 +248,7 @@ candidate_rows AS ( {%- endfor %} ORDER BY {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_load_date = bridge_walk[bridge_step]['bridge_load_date'] %} + {% set bridge_load_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_load_date']) %} {%- if loop.first %} {{ bridge_load_date ~' DESC' }} {%- else %} @@ -262,12 +265,12 @@ bridge AS ( {{ dbtvault.prefix([src_pk], 'c') }}, c.AS_OF_DATE {%- for bridge_step in bridge_walk.keys() -%} - {% set bridge_link_pk = bridge_walk[bridge_step]['bridge_link_pk'] -%} + {% set bridge_link_pk = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_link_pk']) -%} {{ ',c.' ~ bridge_link_pk }} {%- endfor %} FROM candidate_rows AS c {%- for bridge_step in bridge_walk.keys() -%} - {%- set bridge_end_date = bridge_walk[bridge_step]['bridge_end_date'] -%} + {%- set bridge_end_date = dbtvault.escape_column_names(bridge_walk[bridge_step]['bridge_end_date']) -%} {%- if loop.first %} WHERE TO_DATE({{ 'c.' ~ bridge_end_date }}) = TO_DATE('{{ max_datetime }}') {%- else %} diff --git a/macros/tables/eff_sat.sql b/macros/tables/snowflake/eff_sat.sql similarity index 92% rename from macros/tables/eff_sat.sql rename to macros/tables/snowflake/eff_sat.sql index 771fc1440..44daf48ee 100644 --- a/macros/tables/eff_sat.sql +++ b/macros/tables/snowflake/eff_sat.sql @@ -13,6 +13,15 @@ src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_dfk = dbtvault.escape_column_names(src_dfk) -%} +{%- set src_sfk = dbtvault.escape_column_names(src_sfk) -%} +{%- set src_start_date = dbtvault.escape_column_names(src_start_date) -%} +{%- set src_end_date = dbtvault.escape_column_names(src_end_date) -%} +{%- set src_eff = dbtvault.escape_column_names(src_eff) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_dfk, src_sfk, src_start_date, src_end_date, src_eff, src_ldts, src_source]) -%} {%- set fk_cols = dbtvault.expand_column_list(columns=[src_dfk, src_sfk]) -%} {%- set dfk_cols = dbtvault.expand_column_list(columns=[src_dfk]) -%} diff --git a/macros/tables/hub.sql b/macros/tables/snowflake/hub.sql similarity index 90% rename from macros/tables/hub.sql rename to macros/tables/snowflake/hub.sql index 32a551ad3..8121a36ef 100644 --- a/macros/tables/hub.sql +++ b/macros/tables/snowflake/hub.sql @@ -12,10 +12,15 @@ src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_nk = dbtvault.escape_column_names(src_nk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_ldts, src_source]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} {{ dbtvault.prepend_generated_by() }} diff --git a/macros/tables/link.sql b/macros/tables/snowflake/link.sql similarity index 91% rename from macros/tables/link.sql rename to macros/tables/snowflake/link.sql index 036a74d0e..0e4960ea2 100644 --- a/macros/tables/link.sql +++ b/macros/tables/snowflake/link.sql @@ -12,11 +12,16 @@ src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_fk = dbtvault.escape_column_names(src_fk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_ldts, src_source]) -%} {%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} {{ dbtvault.prepend_generated_by() }} diff --git a/macros/tables/ma_sat.sql b/macros/tables/snowflake/ma_sat.sql similarity index 85% rename from macros/tables/ma_sat.sql rename to macros/tables/snowflake/ma_sat.sql index 66f8f2552..b387a927b 100644 --- a/macros/tables/ma_sat.sql +++ b/macros/tables/snowflake/ma_sat.sql @@ -13,6 +13,13 @@ src_payload=src_payload, src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_cdk = dbtvault.escape_column_names(src_cdk) -%} +{%- set src_hashdiff = dbtvault.escape_column_names(src_hashdiff) -%} +{%- set src_payload = dbtvault.escape_column_names(src_payload) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_cdk, src_payload, src_eff, src_ldts, src_source]) -%} {%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} {%- set cdk_cols = dbtvault.expand_column_list(columns=[src_cdk]) -%} @@ -52,13 +59,13 @@ WITH source_data AS ( {# Select latest records from satellite, restricted to PKs in source data -#} latest_records AS ( - SELECT {{ dbtvault.prefix(cols_for_latest, 'mas') }} + SELECT {{ dbtvault.prefix(cols_for_latest, 'mas', alias_target='target') }} ,mas.latest_rank ,DENSE_RANK() OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'mas') }} - ORDER BY {{ dbtvault.prefix([src_hashdiff], 'mas') }}, {{ dbtvault.prefix(cdk_cols, 'mas') }} ASC) AS check_rank + ORDER BY {{ dbtvault.prefix([src_hashdiff], 'mas', alias_target='target') }}, {{ dbtvault.prefix(cdk_cols, 'mas') }} ASC) AS check_rank FROM ( - SELECT {{ dbtvault.prefix(cols_for_latest, 'inner_mas') }} + SELECT {{ dbtvault.prefix(cols_for_latest, 'inner_mas', alias_target='target') }} ,RANK() OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'inner_mas') }} ORDER BY {{ dbtvault.prefix([src_ldts], 'inner_mas') }} DESC) AS latest_rank FROM {{ this }} AS inner_mas @@ -96,7 +103,7 @@ records_to_insert AS ( SELECT 1 FROM ( - SELECT {{ dbtvault.prefix(cols_for_latest, 'lr') }} + SELECT {{ dbtvault.prefix(cols_for_latest, 'lr', alias_target='target') }} ,lg.latest_count FROM latest_records AS lr INNER JOIN latest_group_details AS lg @@ -104,7 +111,7 @@ records_to_insert AS ( AND {{ dbtvault.prefix([src_ldts], 'lr') }} = {{ dbtvault.prefix([src_ldts], 'lg') }} ) AS active_records WHERE {{ dbtvault.multikey([src_pk], prefix=['stage', 'active_records'], condition='=') }} - AND {{ dbtvault.prefix([src_hashdiff], 'stage') }} = {{ dbtvault.prefix([src_hashdiff], 'active_records') }} + AND {{ dbtvault.prefix([src_hashdiff], 'stage') }} = {{ dbtvault.prefix([src_hashdiff], 'active_records', alias_target='target') }} AND {{ dbtvault.multikey(cdk_cols, prefix=['stage', 'active_records'], condition='=') }} AND stage.source_count = active_records.latest_count ) diff --git a/macros/tables/pit.sql b/macros/tables/snowflake/pit.sql similarity index 70% rename from macros/tables/pit.sql rename to macros/tables/snowflake/pit.sql index 283b652a5..9338e1008 100644 --- a/macros/tables/pit.sql +++ b/macros/tables/snowflake/pit.sql @@ -16,8 +16,13 @@ stage_tables=stage_tables, src_ldts=src_ldts) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} + {{ dbtvault.prepend_generated_by() }} +{% set adapter_type = dbtvault.get_adapter_type() %} + {%- if (as_of_dates_table is none) and execute -%} {%- set error_message -%} "PIT error: Missing as_of_dates table configuration. A as_of_dates_table must be provided." @@ -141,10 +146,12 @@ backfill AS ( {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] | upper -%} {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] | upper -%} {%- set sat_name = sat_name | upper %} - - CAST('{{ ghost_pk }}' AS BINARY(16)) AS {{ sat_name }}_{{ sat_pk_name }}, - - CAST('{{ ghost_date }}' AS {{ dbtvault.type_timestamp() }}) AS {{ sat_name }}_{{ sat_ldts_name }} + {%- if adapter_type == "sqlserver" -%} + CONVERT(BINARY(16), '{{ ghost_pk }}', 2) AS {{ dbtvault.escape_column_names( sat_name ~ '_' ~ sat_pk_name ) }}, + {%- else -%} + CAST('{{ ghost_pk }}' AS BINARY(16)) AS {{ dbtvault.escape_column_names( sat_name ~ '_' ~ sat_pk_name ) }}, + {%- endif -%} + CAST('{{ ghost_date }}' AS {{ dbtvault.type_timestamp() }}) AS {{ dbtvault.escape_column_names( sat_name ~ '_' ~ sat_ldts_name ) }} {{- ',' if not loop.last -}} {%- endfor %} FROM backfill_rows_as_of_dates AS a @@ -152,11 +159,11 @@ backfill AS ( {% for sat_name in satellites -%} {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%} - LEFT JOIN {{ ref(sat_name) }} AS {{ sat_name | lower }}_src - {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }} - {{ "AND" | indent(4) }} {{ sat_name | lower }}_src.{{ sat_ldts }} <= a.AS_OF_DATE + {%- set sat_pk = dbtvault.escape_column_names(satellites[sat_name]['pk'][sat_pk_name]) -%} + {%- set sat_ldts = dbtvault.escape_column_names(satellites[sat_name]['ldts'][sat_ldts_name]) -%} + LEFT JOIN {{ ref(sat_name) }} AS {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }} + {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_pk }} + {{ "AND" | indent(4) }} {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_ldts }} <= a.AS_OF_DATE {% endfor -%} GROUP BY @@ -180,11 +187,14 @@ new_rows AS ( {%- for sat_name in satellites -%} {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %} - - COALESCE(MAX({{ sat_name | lower }}_src.{{ sat_pk }}), CAST('{{ ghost_pk }}' AS BINARY(16))) AS {{ sat_name | upper }}_{{ sat_pk_name | upper }}, - COALESCE(MAX({{ sat_name | lower }}_src.{{ sat_ldts }}), CAST('{{ ghost_date }}' AS {{ dbtvault.type_timestamp() }})) AS {{ sat_name | upper }}_{{ sat_ldts_name | upper }} + {%- set sat_pk = dbtvault.escape_column_names(satellites[sat_name]['pk'][sat_pk_name]) -%} + {%- set sat_ldts = dbtvault.escape_column_names(satellites[sat_name]['ldts'][sat_ldts_name]) %} + {%- if adapter_type == "sqlserver" -%} + COALESCE(MAX({{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_pk }}), CONVERT(BINARY(16), '{{ ghost_pk }}', 2)) AS {{ dbtvault.escape_column_names( sat_name | upper ~ '_' ~ sat_pk_name | upper ) }}, + {%- else -%} + COALESCE(MAX({{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_pk }}), CAST('{{ ghost_pk }}' AS BINARY(16))) AS {{ dbtvault.escape_column_names( sat_name | upper ~ '_' ~ sat_pk_name | upper ) }}, + {%- endif -%} + COALESCE(MAX({{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_ldts }}), CAST('{{ ghost_date }}' AS {{ dbtvault.type_timestamp() }})) AS {{ dbtvault.escape_column_names( sat_name | upper ~ '_' ~ sat_ldts_name | upper ) }} {{- "," if not loop.last }} {%- endfor %} FROM new_rows_as_of_dates AS a @@ -192,11 +202,11 @@ new_rows AS ( {% for sat_name in satellites -%} {%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%} {%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%} - {%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%} - {%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%} - LEFT JOIN {{ ref(sat_name) }} AS {{ sat_name | lower }}_src - {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }} - {{ "AND" | indent(4) }} {{ sat_name | lower }}_src.{{ sat_ldts }} <= a.AS_OF_DATE + {%- set sat_pk = dbtvault.escape_column_names(satellites[sat_name]['pk'][sat_pk_name]) -%} + {%- set sat_ldts = dbtvault.escape_column_names(satellites[sat_name]['ldts'][sat_ldts_name]) -%} + LEFT JOIN {{ ref(sat_name) }} AS {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }} + {{ "ON" | indent(4) }} a.{{ src_pk }} = {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_pk }} + {{ "AND" | indent(4) }} {{ dbtvault.escape_column_names( sat_name | lower ~ '_src' ) }}.{{ sat_ldts }} <= a.AS_OF_DATE {% endfor -%} GROUP BY diff --git a/macros/tables/sat.sql b/macros/tables/snowflake/sat.sql similarity index 82% rename from macros/tables/sat.sql rename to macros/tables/snowflake/sat.sql index 3e6022599..029844a4d 100644 --- a/macros/tables/sat.sql +++ b/macros/tables/snowflake/sat.sql @@ -12,12 +12,18 @@ src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_hashdiff = dbtvault.escape_column_names(src_hashdiff) -%} +{%- set src_payload = dbtvault.escape_column_names(src_payload) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source]) -%} {%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} {%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} {{ dbtvault.prepend_generated_by() }} @@ -40,7 +46,6 @@ WITH source_data AS ( {%- if dbtvault.is_any_incremental() %} latest_records AS ( - SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} FROM ( SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, @@ -53,7 +58,7 @@ latest_records AS ( SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} FROM source_data ) AS source_records - ON {{ dbtvault.multikey(src_pk, prefix=['current_records', 'source_records'], condition='=') }} + ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} ) AS a WHERE a.rank = 1 ), @@ -66,11 +71,11 @@ records_to_insert AS ( {%- if dbtvault.is_any_incremental() %} LEFT JOIN latest_records ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} - WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL {%- endif %} ) SELECT * FROM records_to_insert -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} diff --git a/macros/tables/t_link.sql b/macros/tables/snowflake/t_link.sql similarity index 84% rename from macros/tables/t_link.sql rename to macros/tables/snowflake/t_link.sql index 7640b5ae2..dbe4f0253 100644 --- a/macros/tables/t_link.sql +++ b/macros/tables/snowflake/t_link.sql @@ -12,6 +12,13 @@ src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_fk = dbtvault.escape_column_names(src_fk) -%} +{%- set src_payload = dbtvault.escape_column_names(src_payload) -%} +{%- set src_eff = dbtvault.escape_column_names(src_eff) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_payload, src_eff, src_ldts, src_source]) -%} {%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} @@ -45,4 +52,4 @@ records_to_insert AS ( SELECT * FROM records_to_insert -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} diff --git a/macros/tables/xts.sql b/macros/tables/snowflake/xts.sql similarity index 83% rename from macros/tables/xts.sql rename to macros/tables/snowflake/xts.sql index 6d95874e9..ea9a75826 100644 --- a/macros/tables/xts.sql +++ b/macros/tables/snowflake/xts.sql @@ -8,6 +8,10 @@ {%- macro default__xts(src_pk, src_satellite, src_ldts, src_source, source_model) -%} +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + {{ dbtvault.prepend_generated_by() }} {%- if not (source_model is iterable and source_model is not string) -%} @@ -21,7 +25,7 @@ {%- set hashdiff = (satellite[1]['hashdiff'].values() | list) [0] %} satellite_{{ satellite_name }}_from_{{ src }} AS ( - SELECT {{ dbtvault.prefix([src_pk], 's') }}, s.{{ hashdiff }} AS HASHDIFF, s.{{ satellite_name }} AS SATELLITE_NAME, s.{{ src_ldts }}, s.{{ src_source }} + SELECT {{ dbtvault.prefix([src_pk], 's') }}, s.{{ dbtvault.escape_column_names(hashdiff) }} AS HASHDIFF, s.{{ dbtvault.escape_column_names(satellite_name) }} AS SATELLITE_NAME, s.{{ src_ldts }}, s.{{ src_source }} FROM {{ ref(src) }} AS s WHERE {{ dbtvault.multikey(src_pk, prefix='s', condition='IS NOT NULL') }} ), @@ -58,4 +62,4 @@ records_to_insert AS ( SELECT * FROM records_to_insert -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} diff --git a/macros/tables/sqlserver/hub.sql b/macros/tables/sqlserver/hub.sql new file mode 100644 index 000000000..79c98f43c --- /dev/null +++ b/macros/tables/sqlserver/hub.sql @@ -0,0 +1,112 @@ +{%- macro sqlserver__hub(src_pk, src_nk, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_nk=src_nk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_nk = dbtvault.escape_column_names(src_nk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_ldts, src_source]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ source_cols_with_rank | join(', ') }} + {%- else %} + SELECT {{ source_cols | join(', ') }} + {%- endif %} + FROM + ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + ) h + WHERE h.row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} +{%- if source_model | length > 1 %} + +row_rank_union AS ( + SELECT * + FROM + ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + ) h + WHERE h.row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/sqlserver/link.sql b/macros/tables/sqlserver/link.sql new file mode 100644 index 000000000..34caba721 --- /dev/null +++ b/macros/tables/sqlserver/link.sql @@ -0,0 +1,113 @@ +{%- macro sqlserver__link(src_pk, src_fk, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_fk=src_fk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_fk = dbtvault.escape_column_names(src_fk) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_ldts, src_source]) -%} +{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + SELECT * + FROM + ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + {%- if source_model | length == 1 %} + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} + {%- endif %} + ) l + WHERE l.row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if source_model | length > 1 %} + +row_rank_union AS ( + SELECT * + FROM + ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + ) r + WHERE r.row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/sqlserver/sat.sql b/macros/tables/sqlserver/sat.sql new file mode 100644 index 000000000..d18a3681d --- /dev/null +++ b/macros/tables/sqlserver/sat.sql @@ -0,0 +1,74 @@ +{%- macro sqlserver__sat(src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set src_pk = dbtvault.escape_column_names(src_pk) -%} +{%- set src_hashdiff = dbtvault.escape_column_names(src_hashdiff) -%} +{%- set src_payload = dbtvault.escape_column_names(src_payload) -%} +{%- set src_ldts = dbtvault.escape_column_names(src_ldts) -%} +{%- set src_source = dbtvault.escape_column_names(src_source) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +WITH source_data AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + {%- endif %} + FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} + {%- if model.config.materialized == 'vault_insert_by_period' %} + AND __PERIOD_FILTER__ + {% elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {% endif %} +), + +{% if dbtvault.is_any_incremental() %} + +latest_records AS ( + SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} + FROM + ( + SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + RANK() OVER ( + PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + ) AS rank + FROM {{ this }} AS current_records + JOIN ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + FROM source_data + ) AS source_records + ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} + ) AS a + WHERE a.rank = 1 +), + +{%- endif %} + +records_to_insert AS ( + SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN latest_records + ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file