From 425bc814f82e8b0407462c1e68c06066871086ae Mon Sep 17 00:00:00 2001 From: Alex Higgs Date: Tue, 26 Jan 2021 11:19:54 +0000 Subject: [PATCH] Release 0.7.2 --- dbt_project.yml | 16 +-- .../helpers/get_period_filter_sql.md | 2 +- .../replace_placeholder_with_filter.md | 4 +- docs/staging.md | 2 +- macros/internal/alias.sql | 6 +- macros/internal/alias_all.sql | 12 +- macros/internal/as_constant.sql | 8 +- macros/internal/expand_column_list.sql | 27 ++-- macros/internal/get_package_namespaces.sql | 4 + macros/internal/is_checks.sql | 33 +++++ macros/internal/multikey.sql | 4 +- macros/internal/process_macros.sql | 96 ------------- macros/internal/stage_processing_macros.sql | 89 ++++++++++++ macros/materialisations/helpers_schema.yml | 6 +- .../helpers_snowflake_schema.yml | 6 +- .../{helpers.sql => period_mat_helpers.sql} | 80 +++++------ macros/materialisations/rank_mat_helpers.sql | 78 +++++++++++ macros/materialisations/shared_helpers.sql | 10 ++ ...vault_insert_by_period_materialization.sql | 8 +- .../vault_insert_by_rank_materialization.sql | 125 +++++++++++++++++ macros/staging/derive_columns.sql | 26 ++-- macros/staging/hash_columns.sql | 16 +-- macros/staging/rank_columns.sql | 23 ++++ macros/staging/source_columns.sql | 17 ++- macros/staging/stage.sql | 128 +++++++++++------- macros/supporting/hash.sql | 48 +++++-- macros/supporting/prefix.sql | 4 +- macros/tables/eff_sat.sql | 41 ++++-- macros/tables/hub.sql | 72 ++++++---- macros/tables/link.sql | 76 +++++++---- macros/tables/sat.sql | 56 +++++--- macros/tables/t_link.sql | 9 +- 32 files changed, 762 insertions(+), 370 deletions(-) create mode 100644 macros/internal/get_package_namespaces.sql create mode 100644 macros/internal/is_checks.sql delete mode 100644 macros/internal/process_macros.sql create mode 100644 macros/internal/stage_processing_macros.sql rename macros/materialisations/{helpers.sql => period_mat_helpers.sql} (67%) create mode 100644 macros/materialisations/rank_mat_helpers.sql create mode 100644 macros/materialisations/shared_helpers.sql create mode 100644 macros/materialisations/vault_insert_by_rank_materialization.sql create mode 100644 macros/staging/rank_columns.sql diff --git a/dbt_project.yml b/dbt_project.yml index 380f2947f..d4946d831 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,10 +1,9 @@ name: 'dbtvault' -version: '0.7.0' +version: '0.7.2' require-dbt-version: [">=0.18.0", "<0.19.0"] +config-version: 2 -profile: 'dbtvault' - -source-paths: ["models", "models_test"] +source-paths: ["models"] analysis-paths: ["analysis"] test-paths: ["tests"] data-paths: ["data"] @@ -13,9 +12,8 @@ docs-paths: ["docs"] target-path: "target" clean-targets: - - "target" - - "dbt_modules" + - "target" + - "dbt_modules" -models: - vars: - hash: MD5 \ No newline at end of file +vars: + hash: MD5 \ No newline at end of file diff --git a/docs/materialisations/helpers/get_period_filter_sql.md b/docs/materialisations/helpers/get_period_filter_sql.md index eee1aa7a8..e6ca30657 100644 --- a/docs/materialisations/helpers/get_period_filter_sql.md +++ b/docs/materialisations/helpers/get_period_filter_sql.md @@ -1,6 +1,6 @@ {% docs macro__get_period_filter_sql %} -A wrapper around the `replace_placeholder_with_filter` macro which creates a query designed to +A wrapper around the `replace_placeholder_with_period_filter` macro which creates a query designed to build a temporary table, to select the necessary records for the given load cycle. {% enddocs %} diff --git a/docs/materialisations/helpers/replace_placeholder_with_filter.md b/docs/materialisations/helpers/replace_placeholder_with_filter.md index 8c17d4945..f2261e8bc 100644 --- a/docs/materialisations/helpers/replace_placeholder_with_filter.md +++ b/docs/materialisations/helpers/replace_placeholder_with_filter.md @@ -1,4 +1,4 @@ -{% docs macro__replace_placeholder_with_filter %} +{% docs macro__replace_placeholder_with_period_filter %} Replace the `__PERIOD_FILTER__` string present in the given SQL, with a `WHERE` clause which filters data by a specific `period` of time, `offset` from the `start_date`. @@ -6,7 +6,7 @@ specific `period` of time, `offset` from the `start_date`. {% enddocs %} -{% docs arg__replace_placeholder_with_filter__core_sql %} +{% docs arg__replace_placeholder_with_period_filter__core_sql %} SQL string containing the `__PERIOD_FILTER__` string. diff --git a/docs/staging.md b/docs/staging.md index f7dd7b0b2..7db16fcf9 100644 --- a/docs/staging.md +++ b/docs/staging.md @@ -3,7 +3,7 @@ A macro to aid in generating a staging layer for the raw vault. Allows users to: - Create new columns from already existing columns (Derived columns) -- Create new hashed columns from already existing columns (Hashed columns) +- Create new hashed columns from already existing columns and provided derived columns (Hashed columns) [Read more online](https://dbtvault.readthedocs.io/en/latest/macros/#stage) diff --git a/macros/internal/alias.sql b/macros/internal/alias.sql index 07e57a1c2..9a786cb13 100644 --- a/macros/internal/alias.sql +++ b/macros/internal/alias.sql @@ -1,14 +1,14 @@ {%- macro alias(alias_config=none, prefix=none) -%} - {{- adapter.dispatch('alias', packages = var('adapter_packages', ['dbtvault']))(alias_config=alias_config, prefix=prefix) -}} + {{- adapter.dispatch('alias', packages = dbtvault.get_dbtvault_namespaces())(alias_config=alias_config, prefix=prefix) -}} {%- endmacro %} {%- macro default__alias(alias_config=none, prefix=none) -%} -{%- if alias_config -%} +{%- if alias_config is defined and alias_config is not none and alias_config -%} - {%- if alias_config is iterable and alias_config is not string -%} + {%- if alias_config is mapping -%} {%- if alias_config['source_column'] and alias_config['alias'] -%} diff --git a/macros/internal/alias_all.sql b/macros/internal/alias_all.sql index 14e4b3fa0..987b02ea4 100644 --- a/macros/internal/alias_all.sql +++ b/macros/internal/alias_all.sql @@ -1,12 +1,12 @@ {%- macro alias_all(columns=none, prefix=none) -%} - {{- adapter.dispatch('alias_all', packages = var('adapter_packages', ['dbtvault']))(columns=columns, prefix=prefix) -}} + {{- adapter.dispatch('alias_all', packages = dbtvault.get_dbtvault_namespaces())(columns=columns, prefix=prefix) -}} {%- endmacro %} {%- macro default__alias_all(columns, prefix) -%} -{%- if columns is iterable and columns is not string -%} +{%- if dbtvault.is_list(columns) -%} {%- for column in columns -%} {{ dbtvault.alias(alias_config=column, prefix=prefix) }} @@ -17,6 +17,12 @@ {{ dbtvault.alias(alias_config=columns, prefix=prefix) }} -{%- endif -%} +{%- else -%} + + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid columns object provided. Must be a list or a string.") }} + {%- endif %} + +{%- endif %} {%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/as_constant.sql b/macros/internal/as_constant.sql index 96b8f1612..69e0e7d62 100644 --- a/macros/internal/as_constant.sql +++ b/macros/internal/as_constant.sql @@ -1,12 +1,12 @@ {%- macro as_constant(column_str=none) -%} - {{- adapter.dispatch('as_constant', packages = var('adapter_packages', ['dbtvault']))(column_str=column_str) -}} + {{- adapter.dispatch('as_constant', packages = dbtvault.get_dbtvault_namespaces())(column_str=column_str) -}} {%- endmacro %} {%- macro default__as_constant(column_str) -%} - {% if column_str is not none %} + {% if column_str is not none and column_str is string and column_str %} {%- if column_str | first == "!" -%} @@ -17,6 +17,10 @@ {{- return(column_str) -}} {%- endif -%} + {%- else -%} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid columns_str object provided. Must be a string and not null.") }} + {%- endif %} {%- endif -%} {%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/expand_column_list.sql b/macros/internal/expand_column_list.sql index 7b1a2d835..f1afa7478 100644 --- a/macros/internal/expand_column_list.sql +++ b/macros/internal/expand_column_list.sql @@ -8,7 +8,7 @@ {%- set col_list = [] -%} -{%- if columns is iterable -%} +{%- if dbtvault.is_list(columns) -%} {%- for col in columns -%} @@ -17,25 +17,32 @@ {%- do col_list.append(col) -%} {#- If list of lists -#} - {%- elif col is iterable and col is not string -%} + {%- elif dbtvault.is_list(col) -%} - {%- if col is mapping -%} + {%- for cols in col -%} - {%- do col_list.append(col) -%} + {%- do col_list.append(cols) -%} - {%- else -%} + {%- endfor -%} + {%- elif col is mapping -%} - {%- for cols in col -%} - - {%- do col_list.append(cols) -%} + {%- do col_list.append(col) -%} - {%- endfor -%} + {%- else -%} - {%- endif -%} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid columns object provided. Must be a list of lists, dictionaries or strings.") }} + {%- endif %} {%- endif -%} {%- endfor -%} +{%- else -%} + + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid columns object provided. Must be a list.") }} + {%- endif %} + {%- endif -%} {% do return(col_list) %} diff --git a/macros/internal/get_package_namespaces.sql b/macros/internal/get_package_namespaces.sql new file mode 100644 index 000000000..cd9cc3dc8 --- /dev/null +++ b/macros/internal/get_package_namespaces.sql @@ -0,0 +1,4 @@ +{%- macro get_dbtvault_namespaces() -%} + {%- set override_namespaces = var('adapter_packages', []) -%} + {%- do return(override_namespaces + ['dbtvault']) -%} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/is_checks.sql b/macros/internal/is_checks.sql new file mode 100644 index 000000000..a3f3c1228 --- /dev/null +++ b/macros/internal/is_checks.sql @@ -0,0 +1,33 @@ +{%- macro is_list(obj, empty_is_false=false) -%} + + {%- if obj is iterable and obj is not string and obj is not mapping -%} + {%- if obj is none and obj is undefined and not obj and empty_is_false -%} + {%- do return(false) -%} + {%- endif -%} + + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} + +{%- endmacro -%} + +{%- macro is_nothing(obj) -%} + + {%- if obj is none or obj is undefined or not obj -%} + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} + +{%- endmacro -%} + +{%- macro is_something(obj) -%} + + {%- if obj is not none and obj is defined and obj -%} + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/multikey.sql b/macros/internal/multikey.sql index d57d4e187..f9c671ce2 100644 --- a/macros/internal/multikey.sql +++ b/macros/internal/multikey.sql @@ -1,6 +1,6 @@ {%- macro multikey(columns, prefix=none, condition=none, operator='AND') -%} - {{- adapter.dispatch('multikey', packages = var('adapter_packages', ['dbtvault']))(columns=columns, prefix=prefix, condition=condition, operator=operator) -}} + {{- adapter.dispatch('multikey', packages = dbtvault.get_dbtvault_namespaces())(columns=columns, prefix=prefix, condition=condition, operator=operator) -}} {%- endmacro %} @@ -20,7 +20,7 @@ {%- if not loop.last %} {{ operator }} {% endif %} {% endfor -%} {%- else -%} - {%- if columns is iterable and columns is not string -%} + {%- if dbtvault.is_list(columns) -%} {%- for col in columns -%} {{ prefix[0] ~ '.' if prefix }}{{ col }} {{ condition if condition else '' }} {%- if not loop.last -%} {{ "\n " ~ operator }} {% endif -%} diff --git a/macros/internal/process_macros.sql b/macros/internal/process_macros.sql deleted file mode 100644 index f375c80b0..000000000 --- a/macros/internal/process_macros.sql +++ /dev/null @@ -1,96 +0,0 @@ -{%- macro process_excludes(source_relation=none, derived_columns=none, columns=none) -%} - -{%- set exclude_columns_list = [] -%} -{%- set include_columns = [] -%} -{%- if exclude_columns is none -%} - {%- set exclude_columns = false -%} -{% endif %} - -{#- getting all the source columns -#} - -{%- set source_columns = dbtvault.source_columns(source_relation=source_relation) -%} - -{%- if columns is mapping -%} - - {%- for col in columns -%} - - {# Checks if the exclude flag is present and then creates a exclude list to pass to NEED BETTER NAME FOR MACRO #} - {%- if columns[col] is mapping and columns[col].exclude_columns -%} - - {%- for flagged_cols in columns[col]['columns'] -%} - - {%- do exclude_columns_list.append(flagged_cols) -%} - - {%- endfor -%} - - {%- set include_columns = dbtvault.process_include_columns(primary_set_list=derived_columns, secondary_set_list=source_columns, exclude_columns_list=exclude_columns_list) -%} - - {#- Updates the the apropriate hashdiff to contain the columns we do want to hash -#} - {%- do columns[col].update({'columns': include_columns}) -%} - {%- do columns[col].pop('exclude_columns') -%} - {%- set include_columns = [] -%} - {%- set exclude_columns = [] -%} - - {%- endif -%} - {%- endfor -%} -{%- endif -%} - -{%- do return(columns) -%} - - -{%- endmacro -%} - - -{%- macro process_include_columns(primary_set_list=none, secondary_set_list=none, exclude_columns_list=none) -%} - -{%- set include_columns = [] -%} - -{%- if exclude_columns is none -%} - {%- set exclude_columns_list = [] -%} -{%- endif -%} - -{# Appending primary list items not in exclude columns #} -{%- if primary_set_list is not none -%} - - {%- for primary_col in primary_set_list -%} - - {%- if primary_col not in exclude_columns_list -%} - - {%- if primary_set_list is mapping -%} - {%- set primary_str = dbtvault.as_constant(primary_col) -%} - {%- do include_columns.append(primary_str) -%} - {%- do exclude_columns_list.append(primary_str) -%} - {%- else -%} - {%- do include_columns.append(primary_col) -%} - {%- do exclude_columns_list.append(primary_col) -%} - {%- endif -%} - - {%- endif -%} - - {%- endfor -%} - -{%- endif -%} - -{# Apending the secondary list items not in the priamry list or the exclude list #} -{%- if secondary_set_list is not none -%} - - {%- for secondary_col in secondary_set_list -%} - - {%- if secondary_col not in exclude_columns_list -%} - - {%- if secondary_set_list is mapping -%} - {%- set secondary_str = dbtvault.as_constant(secondary_col) -%} - {%- do include_columns.append(secondary_str) -%} - {%- else -%} - {%- do include_columns.append(secondary_col) -%} - {%- endif -%} - - {%- endif -%} - - {% endfor -%} - -{%- endif -%} - -{%- do return(include_columns) -%} - -{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/stage_processing_macros.sql b/macros/internal/stage_processing_macros.sql new file mode 100644 index 000000000..15f782a08 --- /dev/null +++ b/macros/internal/stage_processing_macros.sql @@ -0,0 +1,89 @@ +{%- macro process_columns_to_select(columns_list=none, exclude_columns_list=none) -%} + + {% set columns_to_select = [] %} + + {% if not dbtvault.is_list(columns_list) or not dbtvault.is_list(exclude_columns_list) %} + + {{- exceptions.raise_compiler_error("One or both arguments are not of list type.") -}} + + {%- endif -%} + + {%- if dbtvault.is_something(columns_list) and dbtvault.is_something(exclude_columns_list) -%} + + {%- for col in columns_list -%} + + {%- if col not in exclude_columns_list -%} + {%- do columns_to_select.append(col) -%} + {%- endif -%} + + {%- endfor -%} + + {%- endif -%} + + {%- do return(columns_to_select) -%} + +{%- endmacro -%} + + +{%- macro extract_column_names(columns_dict=none) -%} + + {%- set extracted_column_names = [] -%} + + {%- if columns_dict is mapping -%} + {%- for key, value in columns_dict.items() -%} + {%- do extracted_column_names.append(key) -%} + {%- endfor -%} + + {%- do return(extracted_column_names) -%} + {%- else -%} + {%- do return([]) -%} + {%- endif -%} + +{%- endmacro -%} + + +{%- macro process_hash_column_excludes(hash_columns=none, source_columns=none) -%} + + {%- set processed_hash_columns = {} -%} + + {%- for col, col_mapping in hash_columns.items() -%} + + {%- if col_mapping is mapping -%} + {%- if col_mapping.exclude_columns -%} + + {%- if col_mapping.columns -%} + + {%- set columns_to_hash = dbtvault.process_columns_to_select(source_columns, col_mapping.columns) -%} + + {%- do hash_columns[col].pop('exclude_columns') -%} + {%- do hash_columns[col].update({'columns': columns_to_hash}) -%} + + {%- do processed_hash_columns.update({col: hash_columns[col]}) -%} + {%- else -%} + + {%- do hash_columns[col].pop('exclude_columns') -%} + {%- do hash_columns[col].update({'columns': source_columns}) -%} + + {%- do processed_hash_columns.update({col: hash_columns[col]}) -%} + {%- endif -%} + {%- else -%} + {%- do processed_hash_columns.update({col: col_mapping}) -%} + {%- endif -%} + {%- else -%} + {%- do processed_hash_columns.update({col: col_mapping}) -%} + {%- endif -%} + + {%- endfor -%} + + {%- do return(processed_hash_columns) -%} + +{%- endmacro -%} + + +{%- macro print_list(list_to_print=none, indent=4) -%} + + {%- for col_name in list_to_print -%} + {{- col_name | indent(indent) -}}{{ ",\n " if not loop.last }} + {%- endfor -%} + +{%- endmacro -%} diff --git a/macros/materialisations/helpers_schema.yml b/macros/materialisations/helpers_schema.yml index 051681f60..3dee27854 100644 --- a/macros/materialisations/helpers_schema.yml +++ b/macros/materialisations/helpers_schema.yml @@ -1,12 +1,12 @@ version: 2 macros: - - name: replace_placeholder_with_filter - description: '{{ doc("macro__replace_placeholder_with_filter") }}' + - name: replace_placeholder_with_period_filter + description: '{{ doc("macro__replace_placeholder_with_period_filter") }}' arguments: - name: core_sql type: string - description: '{{ doc("arg__replace_placeholder_with_filter__core_sql") }}' + description: '{{ doc("arg__replace_placeholder_with_period_filter__core_sql") }}' - name: timestamp_field type: string description: '{{ doc("arg__period_materialisation__timestamp_field") }}' diff --git a/macros/materialisations/helpers_snowflake_schema.yml b/macros/materialisations/helpers_snowflake_schema.yml index 29ea4746f..099849275 100644 --- a/macros/materialisations/helpers_snowflake_schema.yml +++ b/macros/materialisations/helpers_snowflake_schema.yml @@ -1,15 +1,15 @@ version: 2 macros: - - name: default__replace_placeholder_with_filter + - name: default__replace_placeholder_with_period_filter description: | - {{ doc("macro__replace_placeholder_with_filter") }} + {{ doc("macro__replace_placeholder_with_period_filter") }} {{ doc("platform__snowflake") }} arguments: - name: core_sql type: string - description: '{{ doc("arg__replace_placeholder_with_filter__core_sql") }}' + description: '{{ doc("arg__replace_placeholder_with_period_filter__core_sql") }}' - name: timestamp_field type: string description: '{{ doc("arg__period_materialisation__timestamp_field") }}' diff --git a/macros/materialisations/helpers.sql b/macros/materialisations/period_mat_helpers.sql similarity index 67% rename from macros/materialisations/helpers.sql rename to macros/materialisations/period_mat_helpers.sql index c8be30a78..e3532e7a3 100644 --- a/macros/materialisations/helpers.sql +++ b/macros/materialisations/period_mat_helpers.sql @@ -1,22 +1,22 @@ -{#-- Helper macros for custom materializations #} +{#-- Helper macros for period materializations #} {#-- MULTI-DISPATCH MACROS #} -{#-- REPLACE_PLACEHOLDER_WITH_FILTER #} +{#-- REPLACE_PLACEHOLDER_WITH_PERIOD_FILTER #} -{%- macro replace_placeholder_with_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) -%} +{%- macro replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) -%} - {% set macro = adapter.dispatch('replace_placeholder_with_filter', - packages = var('adapter_packages', ['dbtvault']))(core_sql=core_sql, - timestamp_field=timestamp_field, - start_timestamp=start_timestamp, - stop_timestamp=stop_timestamp, - offset=offset, - period=period) %} + {% set macro = adapter.dispatch('replace_placeholder_with_period_filter', + packages = dbtvault.get_dbtvault_namespaces())(core_sql=core_sql, + timestamp_field=timestamp_field, + start_timestamp=start_timestamp, + stop_timestamp=stop_timestamp, + offset=offset, + period=period) %} {% do return(macro) %} {%- endmacro %} -{% macro default__replace_placeholder_with_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} +{% macro default__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} {%- set period_filter -%} (TO_DATE({{ timestamp_field }}) >= DATE_TRUNC('{{ period }}', TO_DATE('{{ start_timestamp }}') + INTERVAL '{{ offset }} {{ period }}') AND @@ -35,13 +35,13 @@ {%- macro get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} {% set macro = adapter.dispatch('get_period_filter_sql', - packages = var('adapter_packages', ['dbtvault']))(target_cols_csv=target_cols_csv, - base_sql=base_sql, - timestamp_field=timestamp_field, - period=period, - start_timestamp=start_timestamp, - stop_timestamp=stop_timestamp, - offset=offset) %} + packages = dbtvault.get_dbtvault_namespaces())(target_cols_csv=target_cols_csv, + base_sql=base_sql, + timestamp_field=timestamp_field, + period=period, + start_timestamp=start_timestamp, + stop_timestamp=stop_timestamp, + offset=offset) %} {% do return(macro) %} {%- endmacro %} @@ -49,11 +49,11 @@ {%- set filtered_sql = {'sql': base_sql} -%} - {%- do filtered_sql.update({'sql': dbtvault.replace_placeholder_with_filter(filtered_sql.sql, - timestamp_field, - start_timestamp, - stop_timestamp, - offset, period)}) -%} + {%- do filtered_sql.update({'sql': dbtvault.replace_placeholder_with_period_filter(filtered_sql.sql, + timestamp_field, + start_timestamp, + stop_timestamp, + offset, period)}) -%} select {{ target_cols_csv }} from ({{ filtered_sql.sql }}) {%- endmacro %} @@ -63,12 +63,12 @@ {%- macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} {% set macro = adapter.dispatch('get_period_boundaries', - packages = var('adapter_packages', ['dbtvault']))(target_schema=target_schema, - target_table=target_table, - timestamp_field=timestamp_field, - start_date=start_date, - stop_date=stop_date, - period=period) %} + packages = dbtvault.get_dbtvault_namespaces())(target_schema=target_schema, + target_table=target_table, + timestamp_field=timestamp_field, + start_date=start_date, + stop_date=stop_date, + period=period) %} {% do return(macro) %} {%- endmacro %} @@ -79,7 +79,7 @@ with data as ( select coalesce(max({{ timestamp_field }}), '{{ start_date }}')::timestamp as start_timestamp, - coalesce({{ dbt_utils.dateadd('millisecond', 86399999, "nullif('" ~ stop_date ~ "','')::timestamp") }}, + coalesce({{ dbt_utils.dateadd('millisecond', 86399999, "nullif('" ~ stop_date | lower ~ "','none')::timestamp") }}, {{ dbt_utils.current_timestamp() }} ) as stop_timestamp from {{ target_schema }}.{{ target_table }} ) @@ -107,9 +107,9 @@ {%- macro get_period_of_load(period, offset, start_timestamp) -%} {% set macro = adapter.dispatch('get_period_of_load', - packages = var('adapter_packages', ['dbtvault']))(period=period, - offset=offset, - start_timestamp=start_timestamp) %} + packages = dbtvault.get_dbtvault_namespaces())(period=period, + offset=offset, + start_timestamp=start_timestamp) %} {% do return(macro) %} {%- endmacro %} @@ -145,18 +145,6 @@ {% endmacro %} -{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %} - - {%- if model_sql.find(placeholder) == -1 -%} - {%- set error_message -%} - Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql - {%- endset -%} - {{ exceptions.raise_compiler_error(error_message) }} - {%- endif -%} - -{% endmacro %} - - {% macro get_start_stop_dates(timestamp_field, date_source_models) %} {% if config.get('start_date', default=none) is not none %} @@ -192,7 +180,7 @@ {% else %} {%- if execute -%} - {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_period' configuration. Must provide 'start_date' and 'stop_date' and/or 'date_source_models' options.") }} + {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_period' configuration. Must provide 'start_date' and 'stop_date', just 'stop_date', and/or 'date_source_models' options.") }} {%- endif -%} {% endif %} diff --git a/macros/materialisations/rank_mat_helpers.sql b/macros/materialisations/rank_mat_helpers.sql new file mode 100644 index 000000000..78fd9b3ab --- /dev/null +++ b/macros/materialisations/rank_mat_helpers.sql @@ -0,0 +1,78 @@ +{#-- Helper macros for rank materializations #} + +{#-- MULTI-DISPATCH MACROS #} + +{#-- REPLACE_PLACEHOLDER_WITH_RANK_FILTER #} + +{%- macro replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) -%} + + {% set macro = adapter.dispatch('replace_placeholder_with_rank_filter', + packages = dbtvault.get_dbtvault_namespaces())(core_sql=core_sql, + rank_column=rank_column, + rank_iteration=rank_iteration) %} + {% do return(macro) %} +{%- endmacro %} + +{% macro default__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} + + {%- set rank_filter -%} + {{ rank_column }}::INTEGER = {{ rank_iteration }}::INTEGER + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + +{#-- OTHER MACROS #} + +{% macro get_min_max_ranks(rank_column, rank_source_models) %} + + {% if rank_source_models is not none %} + + {% if rank_source_models is string %} + {% set rank_source_models = [rank_source_models] %} + {% endif %} + + {% set query_sql %} + WITH stage AS ( + {% for source_model in rank_source_models %} + SELECT {{ rank_column }} FROM {{ ref(source_model) }} + {% if not loop.last %} UNION ALL {% endif %} + {% endfor %}) + + SELECT MIN({{ rank_column }}) AS MIN, MAX({{ rank_column }}) AS MAX + FROM stage + {% endset %} + + {% set min_max_dict = dbt_utils.get_query_results_as_dict(query_sql) %} + + {% set min_rank = min_max_dict['MIN'][0] | string %} + {% set max_rank = min_max_dict['MAX'][0] | string %} + {% set min_max_ranks = {"min_rank": min_rank, "max_rank": max_rank} %} + + {% do return(min_max_ranks) %} + + {% else %} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid 'vault_insert_by_rank' configuration. Must provide 'rank_column', and 'rank_source_models' options.") }} + {%- endif -%} + {% endif %} + +{% endmacro %} + + +{% macro is_vault_insert_by_rank() %} + {#-- do not run introspective queries in parsing #} + {% if not execute %} + {{ return(False) }} + {% else %} + {% set relation = adapter.get_relation(this.database, this.schema, this.table) %} + + {{ return(relation is not none + and relation.type == 'table' + and model.config.materialized == 'vault_insert_by_rank' + and not flags.FULL_REFRESH) }} + {% endif %} +{% endmacro %} diff --git a/macros/materialisations/shared_helpers.sql b/macros/materialisations/shared_helpers.sql new file mode 100644 index 000000000..b28b9cdba --- /dev/null +++ b/macros/materialisations/shared_helpers.sql @@ -0,0 +1,10 @@ +{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %} + + {%- if model_sql.find(placeholder) == -1 -%} + {%- set error_message -%} + Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql + {%- endset -%} + {{ exceptions.raise_compiler_error(error_message) }} + {%- endif -%} + +{% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/vault_insert_by_period_materialization.sql b/macros/materialisations/vault_insert_by_period_materialization.sql index fa0498885..5ace50eb8 100644 --- a/macros/materialisations/vault_insert_by_period_materialization.sql +++ b/macros/materialisations/vault_insert_by_period_materialization.sql @@ -23,7 +23,7 @@ {% if existing_relation is none %} - {% set filtered_sql = dbtvault.replace_placeholder_with_filter(sql, timestamp_field, + {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, start_stop_dates.start_date, start_stop_dates.stop_date, 0, period) %} @@ -39,7 +39,7 @@ {% do adapter.drop_relation(backup_relation) %} {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set filtered_sql = dbtvault.replace_placeholder_with_filter(sql, timestamp_field, + {% set filtered_sql = dbtvault.replace_placeholder_with_period_filter(sql, timestamp_field, start_stop_dates.start_date, start_stop_dates.stop_date, 0, period) %} @@ -104,7 +104,7 @@ {% endfor %} {% call noop_statement(name='main', status="INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%} - -- no-op + {{ tmp_table_sql }} {%- endcall %} {% endif %} @@ -117,7 +117,7 @@ {%- set rows_inserted = (load_result("main")['status'].split(" "))[1] | int -%} {% call noop_statement(name='main', status="BASE LOAD {}".format(rows_inserted)) -%} - -- no-op + {{ build_sql }} {%- endcall %} -- `COMMIT` happens here diff --git a/macros/materialisations/vault_insert_by_rank_materialization.sql b/macros/materialisations/vault_insert_by_rank_materialization.sql new file mode 100644 index 000000000..23da8d8cb --- /dev/null +++ b/macros/materialisations/vault_insert_by_rank_materialization.sql @@ -0,0 +1,125 @@ +{% materialization vault_insert_by_rank, default -%} + + {%- set full_refresh_mode = flags.FULL_REFRESH -%} + + {%- set target_relation = this -%} + {%- set existing_relation = load_relation(this) -%} + {%- set tmp_relation = make_temp_relation(this) -%} + + {%- set rank_column = config.require('rank_column') -%} + {%- set rank_source_models = config.require('rank_source_models') -%} + + {%- set min_max_ranks = dbtvault.get_min_max_ranks(rank_column, rank_source_models) | as_native -%} + + {%- set to_drop = [] -%} + + {%- do dbtvault.check_placeholder(sql, "__RANK_FILTER__") -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if existing_relation is none %} + + {% set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, 1) %} + + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} + + {% do to_drop.append(tmp_relation) %} + + {% elif existing_relation.is_view or full_refresh_mode %} + {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} + {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + + {% do adapter.drop_relation(backup_relation) %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + + {% set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, 1) %} + {% set build_sql = create_table_as(False, target_relation, filtered_sql) %} + + {% do to_drop.append(tmp_relation) %} + {% do to_drop.append(backup_relation) %} + {% else %} + + {% set target_columns = adapter.get_columns_in_relation(target_relation) %} + {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} + {%- set loop_vars = {'sum_rows_inserted': 0} -%} + + {% for i in range(min_max_ranks.max_rank | int ) -%} + + {%- set iteration_number = i + 1 -%} + + {%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%} + + {{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }} + + {% set tmp_relation = make_temp_relation(this) %} + + {% call statement() -%} + {{ dbt.create_table_as(True, tmp_relation, filtered_sql) }} + {%- endcall %} + + {{ adapter.expand_target_column_types(from_relation=tmp_relation, + to_relation=target_relation) }} + + {%- set insert_query_name = 'main-' ~ i -%} + {% call statement(insert_query_name, fetch_result=True) -%} + insert into {{ target_relation }} ({{ target_cols_csv }}) + ( + select {{ target_cols_csv }} + from {{ tmp_relation.include(schema=True) }} + ); + {%- endcall %} + + {%- set rows_inserted = (load_result(insert_query_name)['status'].split(" "))[1] | int -%} + + {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} + {%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} + + {{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number, + min_max_ranks.max_rank, + rows_inserted, + model.unique_id)) }} + + + {% do to_drop.append(tmp_relation) %} + {% do adapter.commit() %} + + {% endfor %} + + {% call noop_statement(name='main', status="INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%} + {{ filtered_sql }} + {%- endcall %} + + {% endif %} + + {% if build_sql is defined %} + {% call statement("main", fetch_result=True) %} + {{ build_sql }} + {% endcall %} + + {%- set rows_inserted = (load_result("main")['status'].split(" "))[1] | int -%} + + {% call noop_statement(name='main', status="BASE LOAD {}".format(rows_inserted)) -%} + {{ build_sql }} + {%- endcall %} + + -- `COMMIT` happens here + {% do adapter.commit() %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {% for rel in to_drop %} + {% if rel.type is not none %} + {% do adapter.drop_relation(rel) %} + {% endif %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} \ No newline at end of file diff --git a/macros/staging/derive_columns.sql b/macros/staging/derive_columns.sql index 8facdfb80..1a31e971a 100644 --- a/macros/staging/derive_columns.sql +++ b/macros/staging/derive_columns.sql @@ -1,6 +1,6 @@ {%- macro derive_columns(source_relation=none, columns=none) -%} - {{- adapter.dispatch('derive_columns', packages = var('adapter_packages', ['dbtvault']))(source_relation=source_relation, columns=columns) -}} + {{- adapter.dispatch('derive_columns', packages = dbtvault.get_dbtvault_namespaces())(source_relation=source_relation, columns=columns) -}} {%- endmacro %} @@ -17,11 +17,23 @@ {#- Add aliases of derived columns to excludes and full SQL to includes -#} {%- for col in columns -%} + {%- if dbtvault.is_list(columns[col]) -%} + {%- set column_list = [] -%} - {% set column_str = dbtvault.as_constant(columns[col]) %} + {%- for concat_component in columns[col] -%} + {%- set column_str = dbtvault.as_constant(concat_component) -%} + {%- do column_list.append(column_str) -%} + {%- endfor -%} - {%- do der_columns.append(column_str ~ " AS " ~ col) -%} - {%- do exclude_columns.append(col) -%} + {%- set concat_string = "CONCAT_WS(" ~ "'||', " ~ column_list | join(", ") ~ ") AS " ~ col -%} + + {%- do der_columns.append(concat_string) -%} + {%- set exclude_columns = exclude_columns + columns[col] -%} + {% else %} + {%- set column_str = dbtvault.as_constant(columns[col]) -%} + {%- do der_columns.append(column_str ~ " AS " ~ col) -%} + {%- do exclude_columns.append(col) -%} + {% endif %} {%- endfor -%} @@ -36,14 +48,12 @@ {%- endif -%} - {#- Makes sure the columns are appended in a logical order. Derived columns then source columns -#} + {#- Makes sure the columns are appended in a logical order. Derived columns then source columns -#} {%- set include_columns = src_columns + der_columns -%} {#- Print out all columns in includes -#} {%- for col in include_columns -%} - {{ col }} - {%- if not loop.last -%}, -{% endif -%} + {{- col | indent(4) -}}{{ ",\n" if not loop.last }} {%- endfor -%} {%- else -%} diff --git a/macros/staging/hash_columns.sql b/macros/staging/hash_columns.sql index 40db01ff4..bfd4f3104 100644 --- a/macros/staging/hash_columns.sql +++ b/macros/staging/hash_columns.sql @@ -1,12 +1,12 @@ {%- macro hash_columns(columns=none) -%} - {{- adapter.dispatch('hash_columns', packages = var('adapter_packages', ['dbtvault']))(columns=columns) -}} + {{- adapter.dispatch('hash_columns', packages = dbtvault.get_dbtvault_namespaces())(columns=columns) -}} {%- endmacro %} {%- macro default__hash_columns(columns=none) -%} -{%- if columns is mapping -%} +{%- if columns is mapping and columns is not none -%} {%- for col in columns -%} @@ -18,8 +18,8 @@ {%- elif columns[col] is not mapping -%} - {{- dbtvault.hash(columns=columns[col], - alias=col, + {{- dbtvault.hash(columns=columns[col], + alias=col, is_hashdiff=false) -}} {%- elif columns[col] is mapping and not columns[col].is_hashdiff -%} @@ -28,14 +28,12 @@ {%- do exceptions.warn("[" ~ this ~ "] Warning: You provided a list of columns under a 'columns' key, but did not provide the 'is_hashdiff' flag. Use list syntax for PKs.") -%} {% endif %} - {{- dbtvault.hash(columns=columns[col]['columns'], - alias=col) -}} + {{- dbtvault.hash(columns=columns[col]['columns'], alias=col) -}} {%- endif -%} - {%- if not loop.last -%}, -{% endif %} + {{- ",\n" if not loop.last -}} {%- endfor -%} -{%- endif -%} +{%- endif %} {%- endmacro -%} diff --git a/macros/staging/rank_columns.sql b/macros/staging/rank_columns.sql new file mode 100644 index 000000000..4cbbe959b --- /dev/null +++ b/macros/staging/rank_columns.sql @@ -0,0 +1,23 @@ +{%- macro rank_columns(columns=none) -%} + + {{- adapter.dispatch('rank_columns', packages = dbtvault.get_dbtvault_namespaces())(columns=columns) -}} + +{%- endmacro %} + +{%- macro default__rank_columns(columns=none) -%} + +{%- if columns is mapping and columns is not none -%} + + {%- for col in columns -%} + + {%- if columns[col] is mapping and columns[col].partition_by and columns[col].order_by -%} + + {{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(columns[col].partition_by, columns[col].order_by, col) | indent(4) -}} + + {%- endif -%} + + {{- ",\n" if not loop.last -}} + {%- endfor -%} + +{%- endif %} +{%- endmacro -%} diff --git a/macros/staging/source_columns.sql b/macros/staging/source_columns.sql index 4385e7cde..32dc43d46 100644 --- a/macros/staging/source_columns.sql +++ b/macros/staging/source_columns.sql @@ -1,16 +1,15 @@ {%- macro source_columns(source_relation=none) -%} -{%- set include_columns = [] -%} + {%- if source_relation -%} + {%- set source_model_cols = adapter.get_columns_in_relation(source_relation) -%} -{%- if source_relation is defined and source_relation is not none -%} - {%- set source_model_cols = adapter.get_columns_in_relation(source_relation) -%} -{%- endif %} + {%- set column_list = [] -%} -{#- Add all columns from source_model relation -#} -{%- for source_col in source_model_cols -%} - {%- do include_columns.append(source_col.column) -%} -{%- endfor -%} + {%- for source_col in source_model_cols -%} + {%- do column_list.append(source_col.column) -%} + {%- endfor -%} -{%- do return(include_columns) -%} + {%- do return(column_list) -%} + {%- endif %} {%- endmacro -%} \ No newline at end of file diff --git a/macros/staging/stage.sql b/macros/staging/stage.sql index d1edd3431..1e961664d 100644 --- a/macros/staging/stage.sql +++ b/macros/staging/stage.sql @@ -1,13 +1,17 @@ -{%- macro stage(include_source_columns=none, source_model=none, hashed_columns=none, derived_columns=none) -%} +{%- macro stage(include_source_columns=none, source_model=none, hashed_columns=none, derived_columns=none, ranked_columns=none) -%} - {% if include_source_columns is none %} + {%- if include_source_columns is none -%} {%- set include_source_columns = true -%} - {% endif %} + {%- endif -%} - {{- adapter.dispatch('stage', packages = var('adapter_packages', ['dbtvault']))(include_source_columns=include_source_columns, source_model=source_model, hashed_columns=hashed_columns, derived_columns=derived_columns) -}} + {{- adapter.dispatch('stage', packages = dbtvault.get_dbtvault_namespaces())(include_source_columns=include_source_columns, + source_model=source_model, + hashed_columns=hashed_columns, + derived_columns=derived_columns, + ranked_columns=ranked_columns) -}} {%- endmacro -%} -{%- macro default__stage(include_source_columns, source_model, hashed_columns, derived_columns) -%} +{%- macro default__stage(include_source_columns, source_model, hashed_columns, derived_columns, ranked_columns) -%} {{ dbtvault.prepend_generated_by() }} @@ -23,7 +27,7 @@ source_model: source_name: source_table_name" {%- endset -%} - + {{- exceptions.raise_compiler_error(error_message) -}} {%- endif -%} @@ -34,73 +38,103 @@ {%- set source_table_name = source_model[source_name] -%} {%- set source_relation = source(source_name, source_table_name) -%} - + {%- set all_source_columns = dbtvault.source_columns(source_relation=source_relation) -%} {%- elif source_model is not mapping and source_model is not none -%} {%- set source_relation = ref(source_model) -%} + {%- set all_source_columns = dbtvault.source_columns(source_relation=source_relation) -%} +{%- else -%} + + {%- set all_source_columns = [] -%} {%- endif -%} -{#- CTE to add source columns from the source model -#} -WITH stage AS ( - SELECT +{%- set derived_column_names = dbtvault.extract_column_names(derived_columns) -%} +{%- set hashed_column_names = dbtvault.extract_column_names(hashed_columns) -%} +{%- set ranked_column_names = dbtvault.extract_column_names(ranked_columns) -%} +{%- set exclude_column_names = derived_column_names + hashed_column_names %} +{%- set source_and_derived_column_names = all_source_columns + derived_column_names %} + +{%- set source_columns_to_select = dbtvault.process_columns_to_select(all_source_columns, exclude_column_names) -%} +{%- set derived_columns_to_select = dbtvault.process_columns_to_select(source_and_derived_column_names, hashed_column_names) | unique | list -%} +{%- set final_columns_to_select = [] -%} + +{#- Include source columns in final column selection if true -#} +{%- if include_source_columns -%} + {%- if dbtvault.is_nothing(derived_columns) + and dbtvault.is_nothing(hashed_columns) + and dbtvault.is_nothing(ranked_columns) -%} + {%- set final_columns_to_select = final_columns_to_select + all_source_columns -%} + {%- else -%} + {#- Only include non-overriden columns if not just source columns -#} + {%- set final_columns_to_select = final_columns_to_select + source_columns_to_select -%} + {%- endif -%} +{%- endif %} -{% if source_relation is defined -%} - {%- set included_source_columns = dbtvault.source_columns(source_relation=source_relation) -%} +WITH source_data AS ( - {%- for col in included_source_columns -%} - {{ ' ' ~ col }} - {{- ',\n' if not loop.last -}} - {%- endfor -%} + SELECT -{%- endif %} + {{- "\n\n " ~ dbtvault.print_list(all_source_columns) if all_source_columns else " *" }} FROM {{ source_relation }} -), - -{# Derive additional columns, if provided, and carry over source columns from previous CTE for use in the hash stage -#} -derived_columns AS ( - SElECT + {%- set last_cte = "source_data" %} +) - {%- if derived_columns is defined and derived_columns is not none -%} - {%- if include_source_columns or hashed_columns is defined and hashed_columns is not none %} +{%- if dbtvault.is_something(derived_columns) -%}, - {{ dbtvault.derive_columns(source_relation=source_relation, columns=derived_columns) | indent(width=4, first=false) }} - {%- else %} +derived_columns AS ( - {{ dbtvault.derive_columns(columns=derived_columns) | indent(4) }} + SELECT - {%- endif -%} + {{ dbtvault.derive_columns(source_relation=source_relation, columns=derived_columns) | indent(4) }} - {#- If source relation is defined but derived_columns is not -#} - {%- else -%} - {{ " *" }} - {%- endif %} + FROM {{ last_cte }} + {%- set last_cte = "derived_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + derived_column_names %} +) +{%- endif -%} - FROM stage -), +{% if dbtvault.is_something(hashed_columns) -%}, -{# Hash columns, if provided, and process exclusion flags if provided -#} hashed_columns AS ( + SELECT - {%- if hashed_columns is defined and hashed_columns is not none %} - {{- " *," if include_source_columns -}} + {{ dbtvault.print_list(derived_columns_to_select) }}, + + {% set processed_hash_columns = dbtvault.process_hash_column_excludes(hashed_columns, all_source_columns) -%} + {{- dbtvault.hash_columns(columns=processed_hash_columns) | indent(4) }} + + FROM {{ last_cte }} + {%- set last_cte = "hashed_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + hashed_column_names %} +) +{%- endif -%} + +{% if dbtvault.is_something(ranked_columns) -%}, + +ranked_columns AS ( - {%- if derived_columns is defined and derived_columns is not none and include_source_columns is false %} + SELECT *, - {{ dbtvault.derive_columns(columns=derived_columns) | indent(4) }}, - {%- endif %} + {{ dbtvault.rank_columns(columns=ranked_columns) | indent(4) if dbtvault.is_something(ranked_columns) }} - {%- set hashed_columns = dbtvault.process_excludes(source_relation=source_relation, derived_columns=derived_columns, columns=hashed_columns) %} + FROM {{ last_cte }} + {%- set last_cte = "ranked_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + ranked_column_names %} +) +{%- endif -%} - {{ dbtvault.hash_columns(columns=hashed_columns) | indent(4) }} +, + +columns_to_select AS ( + + SELECT - {%- else -%} - {{ " *" }} - {%- endif %} + {{ dbtvault.print_list(final_columns_to_select) }} - FROM derived_columns + FROM {{ last_cte }} ) -SELECT * FROM hashed_columns +SELECT * FROM columns_to_select {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index fe9d8f8fd..ffc485e2f 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -4,12 +4,15 @@ {%- set is_hashdiff = false -%} {% endif %} - {{- adapter.dispatch('hash', packages = var('adapter_packages', ['dbtvault']))(columns=columns, alias=alias, is_hashdiff=is_hashdiff) -}} + {{- adapter.dispatch('hash', packages = dbtvault.get_dbtvault_namespaces())(columns=columns, alias=alias, is_hashdiff=is_hashdiff) -}} {%- endmacro %} {%- macro default__hash(columns, alias, is_hashdiff) -%} +{%- set concat_string = "||" -%} +{%- set null_placeholder_string = "^^" -%} + {%- set hash = var('hash', 'MD5') -%} {#- Select hashing algorithm -#} @@ -27,33 +30,48 @@ {%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" %} {#- Alpha sort columns before hashing if a hashdiff -#} -{%- if is_hashdiff and columns is iterable and columns is not string -%} +{%- if is_hashdiff and dbtvault.is_list(columns) -%} {%- set columns = columns|sort -%} {%- endif -%} {#- If single column to hash -#} {%- if columns is string -%} {%- set column_str = dbtvault.as_constant(columns) -%} - CAST(({{ hash_alg }}({{ standardise | replace('[EXPRESSION]', column_str) }})) AS BINARY({{ hash_size }})) AS {{ alias }} + {{- "CAST(({}({})) AS BINARY({})) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', column_str), hash_size, alias) | indent(4) -}} {#- Else a list of columns to hash -#} {%- else -%} + {%- set all_null = [] -%} -CAST({{ hash_alg }}(CONCAT( + {%- if is_hashdiff -%} + {{- "CAST({}(CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}} + {%- else -%} + {{- "CAST({}(NULLIF(CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}} + {%- endif -%} -{%- for column in columns %} + {%- for column in columns -%} -{%- set column_str = dbtvault.as_constant(column) -%} + {%- do all_null.append(null_placeholder_string) -%} -{%- if not loop.last %} - IFNULL({{ standardise | replace('[EXPRESSION]', column_str) }}, '^^'), '||', -{%- else %} - IFNULL({{ standardise | replace('[EXPRESSION]', column_str) }}, '^^') )) -AS BINARY({{ hash_size }})) AS {{ alias }} -{%- endif -%} + {%- set column_str = dbtvault.as_constant(column) -%} + {{- "\nIFNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', column_str), null_placeholder_string) | indent(4) -}} + {{- "," if not loop.last -}} -{%- endfor -%} -{%- endif -%} + {%- if loop.last -%} + + {% if is_hashdiff %} + {{- "\n)) AS BINARY({})) AS {}".format(hash_size, alias) -}} + {%- else -%} + {{- "\n), '{}')) AS BINARY({})) AS {}".format(all_null | join(""), hash_size, alias) -}} + {%- endif -%} + {%- else -%} -{%- endmacro -%} + {%- do all_null.append(concat_string) -%} + + {%- endif -%} + + {%- endfor -%} + +{%- endif -%} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/prefix.sql b/macros/supporting/prefix.sql index 51f0c9c8e..d72bae973 100644 --- a/macros/supporting/prefix.sql +++ b/macros/supporting/prefix.sql @@ -1,6 +1,8 @@ {%- macro prefix(columns, prefix_str, alias_target) -%} - {{- adapter.dispatch('prefix', packages = var('adapter_packages', ['dbtvault']))(columns=columns, prefix_str=prefix_str, alias_target=alias_target) -}} + {{- adapter.dispatch('prefix', packages = dbtvault.get_dbtvault_namespaces())(columns=columns, + prefix_str=prefix_str, + alias_target=alias_target) -}} {%- endmacro -%} diff --git a/macros/tables/eff_sat.sql b/macros/tables/eff_sat.sql index 70495b920..cd45a8d53 100644 --- a/macros/tables/eff_sat.sql +++ b/macros/tables/eff_sat.sql @@ -1,9 +1,9 @@ {%- macro eff_sat(src_pk, src_dfk, src_sfk, src_start_date, src_end_date, src_eff, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('eff_sat', packages = var('adapter_packages', ['dbtvault']))(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk, - src_start_date=src_start_date, src_end_date=src_end_date, - src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + {{- adapter.dispatch('eff_sat', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk, + src_start_date=src_start_date, src_end_date=src_end_date, + src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} {%- endmacro -%} {%- macro default__eff_sat(src_pk, src_dfk, src_sfk, src_start_date, src_end_date, src_eff, src_ldts, src_source, source_model) -%} @@ -20,14 +20,26 @@ WITH source_data AS ( {%- if model.config.materialized == 'vault_insert_by_period' %} WHERE __PERIOD_FILTER__ {% endif %} + {%- set source_cte = "source_data" %} ), + +{%- if model.config.materialized == 'vault_insert_by_rank' %} +rank_col AS ( + SELECT * FROM source_data + WHERE __RANK_FILTER__ + {%- set source_cte = "rank_col" %} +), +{% endif -%} + {%- if load_relation(this) is none %} + records_to_insert AS ( SELECT {{ dbtvault.alias_all(source_cols, 'e') }} - FROM source_data AS e + FROM {{ source_cte }} AS e ) {%- else %} -latest_eff AS + +latest_open_eff AS ( SELECT {{ dbtvault.alias_all(source_cols, 'b') }}, ROW_NUMBER() OVER ( @@ -35,19 +47,16 @@ latest_eff AS ORDER BY b.{{ src_ldts }} DESC ) AS row_number FROM {{ this }} AS b + WHERE TO_DATE(b.{{ src_end_date }}) = TO_DATE('9999-12-31') + QUALIFY row_number = 1 ), -latest_open_eff AS -( - SELECT {{ dbtvault.alias_all(source_cols, 'a') }} - FROM latest_eff AS a - WHERE TO_DATE(a.{{ src_end_date }}) = TO_DATE('9999-12-31') - AND a.row_number = 1 -), + stage_slice AS ( SELECT {{ dbtvault.alias_all(source_cols, 'stage') }} - FROM source_data AS stage + FROM {{ "rank_col" if model.config.materialized == 'vault_insert_by_rank' else "source_data" }} AS stage ), + new_open_records AS ( SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} @@ -59,6 +68,7 @@ new_open_records AS ( AND {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }} ), {%- if is_auto_end_dating %} + links_to_end_date AS ( SELECT a.* FROM latest_open_eff AS a @@ -67,6 +77,7 @@ links_to_end_date AS ( WHERE {{ dbtvault.multikey(src_sfk, prefix='b', condition='IS NULL', operator='OR') }} OR {{ dbtvault.multikey(src_sfk, prefix=['a', 'b'], condition='<>', operator='OR') }} ), + new_end_dated_records AS ( SELECT DISTINCT h.{{ src_pk }}, @@ -76,6 +87,7 @@ new_end_dated_records AS ( INNER JOIN links_to_end_date AS g ON g.{{ src_pk }} = h.{{ src_pk }} ), + amended_end_dated_records AS ( SELECT DISTINCT a.{{ src_pk }}, @@ -90,6 +102,7 @@ amended_end_dated_records AS ( AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }} ), {%- endif %} + records_to_insert AS ( SELECT * FROM new_open_records {%- if is_auto_end_dating %} diff --git a/macros/tables/hub.sql b/macros/tables/hub.sql index 8305f612b..241ec3b34 100644 --- a/macros/tables/hub.sql +++ b/macros/tables/hub.sql @@ -1,8 +1,8 @@ {%- macro hub(src_pk, src_nk, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('hub', packages = var('adapter_packages', ['dbtvault']))(src_pk=src_pk, src_nk=src_nk, - src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + {{- adapter.dispatch('hub', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_nk=src_nk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} {%- endmacro -%} @@ -10,6 +10,10 @@ {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_ldts, src_source]) -%} +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} + {{ dbtvault.prepend_generated_by() }} {{ 'WITH ' -}} @@ -18,63 +22,73 @@ {%- set source_model = [source_model] -%} {%- endif -%} +{%- set ns = namespace(last_cte= "") -%} + {%- for src in source_model -%} {%- set source_number = loop.index | string -%} -rank_{{ source_number }} AS ( +row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ source_cols_with_rank | join(', ') }}, + {%- else %} SELECT {{ source_cols | join(', ') }}, + {%- endif %} ROW_NUMBER() OVER( PARTITION BY {{ src_pk }} ORDER BY {{ src_ldts }} ASC ) AS row_number FROM {{ ref(src) }} -), -stage_{{ source_number }} AS ( - SELECT DISTINCT {{ source_cols | join(', ') }} - FROM rank_{{ source_number }} - WHERE row_number = 1 -), + QUALIFY row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} {% endfor -%} - +{% if source_model | length > 1 %} stage_union AS ( {%- for src in source_model %} - SELECT * FROM stage_{{ loop.index | string }} + SELECT * FROM row_rank_{{ loop.index | string }} {%- if not loop.last %} UNION ALL {%- endif %} {%- endfor %} + {%- set ns.last_cte = "stage_union" %} ), +{%- endif -%} {%- if model.config.materialized == 'vault_insert_by_period' %} -stage_period_filter AS ( +stage_mat_filter AS ( SELECT * - FROM stage_union + FROM {{ ns.last_cte }} WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} ), -{%- endif %} -rank_union AS ( +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} +{%- if source_model | length > 1 %} + +row_rank_union AS ( SELECT *, ROW_NUMBER() OVER( PARTITION BY {{ src_pk }} ORDER BY {{ src_ldts }}, {{ src_source }} ASC - ) AS row_number - {%- if model.config.materialized == 'vault_insert_by_period' %} - FROM stage_period_filter - {%- else %} - FROM stage_union - {%- endif %} + ) AS row_rank_number + FROM {{ ns.last_cte }} WHERE {{ src_pk }} IS NOT NULL + QUALIFY row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} ), -stage AS ( - SELECT DISTINCT {{ source_cols | join(', ') }} - FROM rank_union - WHERE row_number = 1 -), +{% endif %} records_to_insert AS ( - SELECT stage.* FROM stage + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a {%- if dbtvault.is_vault_insert_by_period() or is_incremental() %} LEFT JOIN {{ this }} AS d - ON stage.{{ src_pk }} = d.{{ src_pk }} + ON a.{{ src_pk }} = d.{{ src_pk }} WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL {%- endif %} ) diff --git a/macros/tables/link.sql b/macros/tables/link.sql index 3407a4e7c..a7368f1c6 100644 --- a/macros/tables/link.sql +++ b/macros/tables/link.sql @@ -1,8 +1,8 @@ {%- macro link(src_pk, src_fk, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('link', packages = var('adapter_packages', ['dbtvault']))(src_pk=src_pk, src_fk=src_fk, - src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + {{- adapter.dispatch('link', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_fk=src_fk, + src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} {%- endmacro -%} @@ -11,6 +11,10 @@ {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_ldts, src_source]) -%} {%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} + {{ dbtvault.prepend_generated_by() }} {{ 'WITH ' -}} @@ -19,67 +23,79 @@ {%- set source_model = [source_model] -%} {%- endif -%} +{%- set ns = namespace(last_cte= "") -%} + {%- for src in source_model -%} -{%- set source_number = (loop.index | string) -%} +{%- set source_number = loop.index | string -%} -rank_{{ source_number }} AS ( +row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ source_cols_with_rank | join(', ') }}, + {%- else %} SELECT {{ source_cols | join(', ') }}, + {%- endif %} ROW_NUMBER() OVER( PARTITION BY {{ src_pk }} ORDER BY {{ src_ldts }} ASC ) AS row_number FROM {{ ref(src) }} -), -stage_{{ source_number }} AS ( - SELECT DISTINCT {{ source_cols | join(', ') }} - FROM rank_{{ source_number }} - WHERE row_number = 1 -), + QUALIFY row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} {% endfor -%} - +{% if source_model | length > 1 %} stage_union AS ( {%- for src in source_model %} - SELECT * FROM stage_{{ loop.index | string }} + SELECT * FROM row_rank_{{ loop.index | string }} {%- if not loop.last %} UNION ALL {%- endif %} {%- endfor %} + {%- set ns.last_cte = "stage_union" %} ), +{%- endif -%} {%- if model.config.materialized == 'vault_insert_by_period' %} -stage_period_filter AS ( +stage_mat_filter AS ( SELECT * - FROM stage_union + FROM {{ ns.last_cte }} WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} ), -{%- endif %} -rank_union AS ( +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if source_model | length > 1 %} + +row_rank_union AS ( SELECT *, ROW_NUMBER() OVER( PARTITION BY {{ src_pk }} ORDER BY {{ src_ldts }}, {{ src_source }} ASC - ) AS row_number - {%- if model.config.materialized == 'vault_insert_by_period' %} - FROM stage_period_filter - {%- else %} - FROM stage_union - {%- endif %} + ) AS row_rank_number + FROM {{ ns.last_cte }} WHERE {{ dbtvault.multikey(fk_cols, condition='IS NOT NULL') }} + QUALIFY row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} ), -stage AS ( - SELECT DISTINCT {{ source_cols | join(', ') }} - FROM rank_union - WHERE row_number = 1 -), +{% endif %} records_to_insert AS ( - SELECT stage.* FROM stage + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a {%- if dbtvault.is_vault_insert_by_period() or is_incremental() %} LEFT JOIN {{ this }} AS d - ON stage.{{ src_pk }} = d.{{ src_pk }} + ON a.{{ src_pk }} = d.{{ src_pk }} WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL {%- endif %} ) SELECT * FROM records_to_insert +{%- endmacro -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/sat.sql b/macros/tables/sat.sql index d26b4e922..589ed5514 100644 --- a/macros/tables/sat.sql +++ b/macros/tables/sat.sql @@ -1,25 +1,44 @@ {%- macro sat(src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('sat', packages = var('adapter_packages', ['dbtvault']))(src_pk=src_pk, src_hashdiff=src_hashdiff, - src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts, - src_source=src_source, source_model=source_model) -}} + {{- adapter.dispatch('sat', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_hashdiff=src_hashdiff, + src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts, + src_source=src_source, source_model=source_model) -}} {%- endmacro %} {%- macro default__sat(src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} {{ dbtvault.prepend_generated_by() }} WITH source_data AS ( - SELECT * - FROM {{ ref(source_model) }} + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + {%- endif %} + FROM {{ ref(source_model) }} AS a {%- if model.config.materialized == 'vault_insert_by_period' %} WHERE __PERIOD_FILTER__ {% endif %} + {%- set source_cte = "source_data" %} +), + +{%- if model.config.materialized == 'vault_insert_by_rank' %} +rank_col AS ( + SELECT * FROM source_data + WHERE __RANK_FILTER__ + {%- set source_cte = "rank_col" %} ), -{% if dbtvault.is_vault_insert_by_period() or is_incremental() -%} +{% endif -%} + +{% if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %} update_records AS ( SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} @@ -27,29 +46,26 @@ update_records AS ( JOIN source_data as b ON a.{{ src_pk }} = b.{{ src_pk }} ), -rank AS ( - SELECT {{ dbtvault.prefix(source_cols, 'c', alias_target='target') }}, + +latest_records AS ( + SELECT {{ dbtvault.prefix(rank_cols, 'c', alias_target='target') }}, CASE WHEN RANK() OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'c') }} ORDER BY {{ dbtvault.prefix([src_ldts], 'c') }} DESC) = 1 THEN 'Y' ELSE 'N' END AS latest FROM update_records as c + QUALIFY latest = 'Y' ), -stage AS ( - SELECT {{ dbtvault.prefix(source_cols, 'd', alias_target='target') }} - FROM rank AS d - WHERE d.latest = 'Y' -), -{% endif -%} +{%- endif %} records_to_insert AS ( SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'e') }} - FROM source_data AS e - {% if dbtvault.is_vault_insert_by_period() or is_incremental() -%} - LEFT JOIN stage - ON {{ dbtvault.prefix([src_hashdiff], 'stage', alias_target='target') }} = {{ dbtvault.prefix([src_hashdiff], 'e') }} - WHERE {{ dbtvault.prefix([src_hashdiff], 'stage', alias_target='target') }} IS NULL - {% endif %} + FROM {{ source_cte }} AS e + {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %} + LEFT JOIN latest_records + ON {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} = {{ dbtvault.prefix([src_hashdiff], 'e') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + {%- endif %} ) SELECT * FROM records_to_insert diff --git a/macros/tables/t_link.sql b/macros/tables/t_link.sql index a00c5406e..638724923 100644 --- a/macros/tables/t_link.sql +++ b/macros/tables/t_link.sql @@ -1,8 +1,8 @@ {%- macro t_link(src_pk, src_fk, src_payload, src_eff, src_ldts, src_source, source_model) -%} - {{- adapter.dispatch('t_link', packages = var('adapter_packages', ['dbtvault']))(src_pk=src_pk, src_fk=src_fk, src_payload=src_payload, - src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, - source_model=source_model) -}} + {{- adapter.dispatch('t_link', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_fk=src_fk, src_payload=src_payload, + src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} {%- endmacro %} @@ -18,6 +18,9 @@ WITH stage AS ( {%- if model.config.materialized == 'vault_insert_by_period' %} WHERE __PERIOD_FILTER__ {%- endif %} + {%- if model.config.materialized == 'vault_insert_by_rank' %} + WHERE __RANK_FILTER__ + {%- endif %} ), records_to_insert AS ( SELECT DISTINCT {{ dbtvault.prefix(source_cols, 'stg') }}