From c38bd711cae143af72a44b308501fef91071e023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Thal=C3=A9n?= Date: Fri, 5 Apr 2024 11:53:32 +0200 Subject: [PATCH 1/4] Add table macros --- macros/tables/fabric/bridge.sql | 16 ++++ macros/tables/fabric/eff_sat.sql | 14 +++ macros/tables/fabric/hub.sql | 104 ++++++++++++++++++++++ macros/tables/fabric/link.sql | 106 +++++++++++++++++++++++ macros/tables/fabric/ma_sat.sql | 133 +++++++++++++++++++++++++++++ macros/tables/fabric/pit.sql | 16 ++++ macros/tables/fabric/ref_table.sql | 14 +++ macros/tables/fabric/sat.sql | 13 +++ macros/tables/fabric/t_link.sql | 13 +++ macros/tables/fabric/xts.sql | 15 ++++ 10 files changed, 444 insertions(+) create mode 100644 macros/tables/fabric/bridge.sql create mode 100644 macros/tables/fabric/eff_sat.sql create mode 100644 macros/tables/fabric/hub.sql create mode 100644 macros/tables/fabric/link.sql create mode 100644 macros/tables/fabric/ma_sat.sql create mode 100644 macros/tables/fabric/pit.sql create mode 100644 macros/tables/fabric/ref_table.sql create mode 100644 macros/tables/fabric/sat.sql create mode 100644 macros/tables/fabric/t_link.sql create mode 100644 macros/tables/fabric/xts.sql diff --git a/macros/tables/fabric/bridge.sql b/macros/tables/fabric/bridge.sql new file mode 100644 index 00000000..c46a82d5 --- /dev/null +++ b/macros/tables/fabric/bridge.sql @@ -0,0 +1,16 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__bridge(src_pk, as_of_dates_table, bridge_walk, stage_tables_ldts, src_extra_columns, src_ldts, source_model) -%} + +{{- automate_dv.default__bridge(src_pk=src_pk, + as_of_dates_table=as_of_dates_table, + bridge_walk=bridge_walk, + stage_tables_ldts=stage_tables_ldts, + src_extra_columns=src_extra_columns, + src_ldts=src_ldts, + source_model=source_model) -}} + +{%- endmacro -%} diff --git a/macros/tables/fabric/eff_sat.sql b/macros/tables/fabric/eff_sat.sql new file mode 100644 index 00000000..d1672d8b --- /dev/null +++ b/macros/tables/fabric/eff_sat.sql @@ -0,0 +1,14 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__eff_sat(src_pk, src_dfk, src_sfk, src_extra_columns, src_start_date, src_end_date, src_eff, src_ldts, src_source, source_model) -%} + +{{- automate_dv.default__eff_sat(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk, + src_extra_columns=src_extra_columns, + src_start_date=src_start_date, src_end_date=src_end_date, + src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/fabric/hub.sql b/macros/tables/fabric/hub.sql new file mode 100644 index 00000000..7b83417f --- /dev/null +++ b/macros/tables/fabric/hub.sql @@ -0,0 +1,104 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif %} + +{{ 'WITH ' -}} + +{%- set stage_count = source_model | length -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ source_cols_with_rank | join(', ') }} + {%- else %} + SELECT {{ source_cols | join(', ') }} + {%- endif %} + FROM ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ automate_dv.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + ) h + WHERE h.row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if stage_count > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} + +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} + +{%- if stage_count > 1 %} + +row_rank_union AS ( + SELECT * + FROM ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + ) h + WHERE h.row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if automate_dv.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/fabric/link.sql b/macros/tables/fabric/link.sql new file mode 100644 index 00000000..c81145ae --- /dev/null +++ b/macros/tables/fabric/link.sql @@ -0,0 +1,106 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%} +{%- set fk_cols = automate_dv.expand_column_list([src_fk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif %} + +{{ 'WITH ' -}} + +{%- set stage_count = source_model | length -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + SELECT * + FROM + ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ automate_dv.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + {%- if stage_count == 1 %} + WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + AND {{ automate_dv.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} + {%- endif %} + ) l + WHERE l.row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} + ),{{ "\n" if not loop.last }} + {% endfor -%} + +{% if stage_count > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if stage_count > 1 %} + +row_rank_union AS ( + SELECT * + FROM + ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ automate_dv.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + ) r + WHERE r.row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if automate_dv.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/fabric/ma_sat.sql b/macros/tables/fabric/ma_sat.sql new file mode 100644 index 00000000..38e19a87 --- /dev/null +++ b/macros/tables/fabric/ma_sat.sql @@ -0,0 +1,133 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} + +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, src_cdk, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set cdk_cols = automate_dv.expand_column_list(columns=[src_cdk]) -%} +{%- set cols_for_latest = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, src_cdk, src_ldts]) %} + +{%- if model.config.materialized == 'vault_insert_by_rank' -%} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} + +{# Select unique source records -#} +WITH source_data AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT DISTINCT {{ automate_dv.prefix(source_cols_with_rank, 's', alias_target='source') }} + {%- else %} + SELECT DISTINCT {{ automate_dv.prefix(source_cols, 's', alias_target='source') }} + {%- endif %} + FROM {{ ref(source_model) }} AS s + WHERE {{ automate_dv.multikey(src_pk, prefix='s', condition='IS NOT NULL') }} + {%- for child_key in src_cdk %} + AND {{ automate_dv.multikey(child_key, prefix='s', condition='IS NOT NULL') }} + {%- endfor %} + {%- if model.config.materialized == 'vault_insert_by_period' %} + AND __PERIOD_FILTER__ + {%- elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {%- endif %} +), + +{# if any_incremental -#} +{% if automate_dv.is_any_incremental() %} + +source_data_with_count AS ( + SELECT a.*, + b.source_count + FROM source_data a + INNER JOIN + ( + SELECT {{ automate_dv.prefix([src_pk], 't') }}, + COUNT(*) AS source_count + FROM ( + SELECT DISTINCT {{ automate_dv.prefix([src_pk], 's') }}, + {{ automate_dv.prefix([src_hashdiff], 's', alias_target='source') }}, + {{ automate_dv.prefix(cdk_cols, 's') }} + FROM source_data AS s + ) AS t + GROUP BY {{ automate_dv.prefix([src_pk], 't') }} + ) AS b + ON {{ automate_dv.multikey(src_pk, prefix=['a','b'], condition='=') }} +), + +{# Select latest records from satellite, restricted to PKs in source data -#} +latest_records AS ( + SELECT {{ automate_dv.prefix(cols_for_latest, 'mas', alias_target='target') }}, + mas.latest_rank, + DENSE_RANK() OVER (PARTITION BY {{ automate_dv.prefix([src_pk], 'mas') }} + ORDER BY {{ automate_dv.prefix([src_hashdiff], 'mas', alias_target='target') }}, {{ automate_dv.prefix(cdk_cols, 'mas') }} ASC + ) AS check_rank + FROM ( + SELECT {{ automate_dv.prefix(cols_for_latest, 'inner_mas', alias_target='target') }}, + RANK() OVER (PARTITION BY {{ automate_dv.prefix([src_pk], 'inner_mas') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'inner_mas') }} DESC + ) AS latest_rank + FROM {{ this }} AS inner_mas + INNER JOIN (SELECT DISTINCT {{ automate_dv.prefix([src_pk], 's') }} FROM source_data as s ) AS spk + ON {{ automate_dv.multikey(src_pk, prefix=['inner_mas', 'spk'], condition='=') }} + {%- if target.type =='databricks' %} + QUALIFY latest_rank = 1 + {%- endif %} + ) AS mas + {% if target.type == 'sqlserver' or target.type == 'postgres' -%} + WHERE latest_rank = 1 + {% endif -%} +), + +{# Select summary details for each group of latest records -#} +latest_group_details AS ( + SELECT {{ automate_dv.prefix([src_pk], 'lr') }}, + {{ automate_dv.prefix([src_ldts], 'lr') }}, + MAX(lr.check_rank) AS latest_count + FROM latest_records AS lr + GROUP BY {{ automate_dv.prefix([src_pk], 'lr') }}, {{ automate_dv.prefix([src_ldts], 'lr') }} +), + +{# endif any_incremental -#} +{%- endif %} + +{# Select groups of source records where at least one member does not appear in a group of latest records -#} +records_to_insert AS ( +{% if not automate_dv.is_any_incremental() -%} + SELECT {{ automate_dv.alias_all(source_cols, 'source_data') }} + FROM source_data +{%- endif -%} + +{#- if any_incremental -#} +{% if automate_dv.is_any_incremental() %} + SELECT {{ automate_dv.alias_all(source_cols, 'source_data_with_count') }} + FROM source_data_with_count + WHERE EXISTS ( + SELECT 1 + FROM source_data_with_count AS stage + WHERE NOT EXISTS ( + SELECT 1 + FROM ( + SELECT {{ automate_dv.prefix(cols_for_latest, 'lr', alias_target='target') }}, + lg.latest_count + FROM latest_records AS lr + INNER JOIN latest_group_details AS lg + ON {{ automate_dv.multikey(src_pk, prefix=['lr', 'lg'], condition='=') }} + AND {{ automate_dv.prefix([src_ldts], 'lr') }} = {{ automate_dv.prefix([src_ldts], 'lg') }} + ) AS active_records + WHERE {{ automate_dv.multikey(src_pk, prefix=['stage', 'active_records'], condition='=') }} + AND {{ automate_dv.prefix([src_hashdiff], 'stage') }} = {{ automate_dv.prefix([src_hashdiff], 'active_records', alias_target='target') }} +{#- In order to maintain the parallel with the standard satellite, we don''t allow for groups of records to be updated if the ldts is the only difference -#} +{#- AND {{ automate_dv.prefix([src_ldts], 'stage') }} = {{ automate_dv.prefix([src_ldts], 'active_records') }} #} + AND {{ automate_dv.multikey(cdk_cols, prefix=['stage', 'active_records'], condition='=') }} + AND stage.source_count = active_records.latest_count + ) + AND {{ automate_dv.multikey(src_pk, prefix=['source_data_with_count', 'stage'], condition='=') }} + ) +{# endif any_incremental -#} +{%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/fabric/pit.sql b/macros/tables/fabric/pit.sql new file mode 100644 index 00000000..88d968de --- /dev/null +++ b/macros/tables/fabric/pit.sql @@ -0,0 +1,16 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__pit(src_pk, src_extra_columns, as_of_dates_table, satellites, stage_tables_ldts, src_ldts, source_model) -%} + +{{- automate_dv.default__pit(src_pk=src_pk, + src_extra_columns=src_extra_columns, + as_of_dates_table=as_of_dates_table, + satellites=satellites, + stage_tables_ldts=stage_tables_ldts, + src_ldts=src_ldts, + source_model=source_model) -}} + +{%- endmacro -%} diff --git a/macros/tables/fabric/ref_table.sql b/macros/tables/fabric/ref_table.sql new file mode 100644 index 00000000..03c734f1 --- /dev/null +++ b/macros/tables/fabric/ref_table.sql @@ -0,0 +1,14 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__ref_table(src_pk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{{- automate_dv.default__ref_table(src_pk=src_pk, + src_extra_columns=src_extra_columns, + src_ldts=src_ldts, + src_source=src_source, + source_model=source_model) -}} + +{%- endmacro -%} diff --git a/macros/tables/fabric/sat.sql b/macros/tables/fabric/sat.sql new file mode 100644 index 00000000..e9488135 --- /dev/null +++ b/macros/tables/fabric/sat.sql @@ -0,0 +1,13 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} + +{{- automate_dv.postgres__sat(src_pk=src_pk, src_hashdiff=src_hashdiff, + src_payload=src_payload, src_extra_columns=src_extra_columns, + src_eff=src_eff, src_ldts=src_ldts, + src_source=src_source, source_model=source_model) -}} + +{%- endmacro -%} diff --git a/macros/tables/fabric/t_link.sql b/macros/tables/fabric/t_link.sql new file mode 100644 index 00000000..e9d10a17 --- /dev/null +++ b/macros/tables/fabric/t_link.sql @@ -0,0 +1,13 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__t_link(src_pk, src_fk, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} + +{{- automate_dv.default__t_link(src_pk=src_pk, src_fk=src_fk, src_payload=src_payload, + src_extra_columns=src_extra_columns, + src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- endmacro -%} diff --git a/macros/tables/fabric/xts.sql b/macros/tables/fabric/xts.sql new file mode 100644 index 00000000..bbb89a45 --- /dev/null +++ b/macros/tables/fabric/xts.sql @@ -0,0 +1,15 @@ +/* + * Copyright (c) Business Thinking Ltd. 2019-2023 + * This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault + */ + +{%- macro fabric__xts(src_pk, src_satellite, src_extra_columns, src_ldts, src_source, source_model) -%} + +{{- automate_dv.default__xts(src_pk=src_pk, + src_satellite=src_satellite, + src_extra_columns=src_extra_columns, + src_ldts=src_ldts, + src_source=src_source, + source_model=source_model) -}} + +{%- endmacro -%} \ No newline at end of file From 2d48c421ab3599f931d942f06afc4a9d5b9254c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Thal=C3=A9n?= Date: Fri, 5 Apr 2024 12:00:46 +0200 Subject: [PATCH 2/4] Add other macros --- .../metadata_processing/concat_ws.sql | 7 ++++ .../get_escape_characters.sql | 4 +++ .../period_mat_helpers/check_datediff.sql | 13 +++++++ .../get_period_boundaries.sql | 36 +++++++++++++++++++ .../get_period_filter_sql.sql | 15 ++++++++ .../period_mat_helpers/get_period_of_load.sql | 16 +++++++++ ...replace_placeholder_with_period_filter.sql | 22 ++++++++++++ .../replace_placeholder_with_rank_filter.sql | 14 ++++++++ macros/staging/null_columns.sql | 7 ++++ macros/supporting/casting/cast_binary.sql | 13 +++++++ macros/supporting/casting/cast_date.sql | 14 ++++++++ macros/supporting/casting/cast_datetime.sql | 9 +++++ macros/supporting/data_types/type_string.sql | 4 +++ .../supporting/data_types/type_timestamp.sql | 4 +++ .../supporting/ghost_records/binary_ghost.sql | 14 ++++++++ .../supporting/ghost_records/null_ghost.sql | 4 +++ macros/supporting/hash.sql | 7 ++++ .../hash_components/null_expression.sql | 10 ++++++ .../hash_components/select_hash_alg.sql | 18 ++++++++++ .../standard_column_wrapper.sql | 17 +++++++++ macros/supporting/max_datetime.sql | 6 ++++ 21 files changed, 254 insertions(+) diff --git a/macros/internal/metadata_processing/concat_ws.sql b/macros/internal/metadata_processing/concat_ws.sql index 503bd831..021a6294 100644 --- a/macros/internal/metadata_processing/concat_ws.sql +++ b/macros/internal/metadata_processing/concat_ws.sql @@ -31,3 +31,10 @@ CONCAT( {{ automate_dv.default__concat_ws(string_list=string_list, separator=separator) }} {%- endmacro -%} + + +{%- macro fabric__concat_ws(string_list, separator="||") -%} + + {{ automate_dv.default__concat_ws(string_list=string_list, separator=separator) }} + +{%- endmacro -%} diff --git a/macros/internal/metadata_processing/get_escape_characters.sql b/macros/internal/metadata_processing/get_escape_characters.sql index ce19199e..0f88da6d 100644 --- a/macros/internal/metadata_processing/get_escape_characters.sql +++ b/macros/internal/metadata_processing/get_escape_characters.sql @@ -44,6 +44,10 @@ {%- do return (('"', '"')) -%} {%- endmacro %} +{%- macro fabric__get_escape_characters() %} + {%- do return (('"', '"')) -%} +{%- endmacro %} + {%- macro databricks__get_escape_characters() %} {%- do return (('`', '`')) -%} {%- endmacro %} diff --git a/macros/materialisations/period_mat_helpers/check_datediff.sql b/macros/materialisations/period_mat_helpers/check_datediff.sql index deb3afa9..778f2c2b 100644 --- a/macros/materialisations/period_mat_helpers/check_datediff.sql +++ b/macros/materialisations/period_mat_helpers/check_datediff.sql @@ -44,4 +44,17 @@ {% do return(num_periods) %} +{% endmacro %} + +{% macro fabric__check_num_periods(start_date, stop_date, period) %} + + {% set num_periods_check_sql %} + SELECT DATEDIFF_BIG({{ period }}, CAST('{{ start_date }}' AS DATETIME2), + CAST(NULLIF('{{ stop_date | lower }}', 'none') AS DATETIME2)) AS NUM_PERIODS + {% endset %} + {% set num_periods_dict = automate_dv.get_query_results_as_dict(num_periods_check_sql) %} + {% set num_periods = num_periods_dict['NUM_PERIODS'][0] | int %} + + {% do return(num_periods) %} + {% endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_period_boundaries.sql b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql index 0b1cce71..1e6ab0a0 100644 --- a/macros/materialisations/period_mat_helpers/get_period_boundaries.sql +++ b/macros/materialisations/period_mat_helpers/get_period_boundaries.sql @@ -118,6 +118,42 @@ {%- endmacro %} +{% macro fabric__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%} + {%- if period is in ['microsecond', 'millisecond', 'second'] -%} + {{ automate_dv.datepart_too_small_error(period=period) }} + {%- endif -%} + + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_date = start_date[0:27] %} + {% set stop_date = stop_date[0:27] %} + {%- set datepart = period -%} + {%- set from_date_or_timestamp = "CAST(NULLIF('{}','none') AS DATETIME2)".format(stop_date | lower) %} + + {% set period_boundary_sql -%} + WITH period_data AS ( + SELECT + CAST(COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date }}' AS DATETIME2)) AS DATETIME2) AS start_timestamp, + CAST(COALESCE({{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }}, + {{ current_timestamp() }} ) AS DATETIME2) AS stop_timestamp + FROM {{ target_relation }} + ) + SELECT + start_timestamp, + stop_timestamp, + {{ datediff('start_timestamp', 'stop_timestamp', period) }} + 1 AS num_periods + FROM period_data + {%- endset %} + + {% set period_boundaries_dict = automate_dv.get_query_results_as_dict(period_boundary_sql) %} + + {% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string, + 'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string, + 'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %} + + {% do return(period_boundaries) %} +{%- endmacro %} + + {% macro databricks__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%} {%- set from_date_or_timestamp = "NULLIF('{}','none')::TIMESTAMP".format(stop_date | lower) -%} diff --git a/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql index dbfde703..9f4cda06 100644 --- a/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql +++ b/macros/materialisations/period_mat_helpers/get_period_filter_sql.sql @@ -47,6 +47,21 @@ + +{% macro fabric__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + {%- set filtered_sql = {'sql': base_sql} -%} + + {%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(core_sql=filtered_sql.sql, + timestamp_field=timestamp_field, + start_timestamp=start_timestamp, + stop_timestamp=stop_timestamp, + offset=offset, period=period)}) -%} + {# MSSQL does not allow CTEs in a subquery #} + {{ filtered_sql.sql }} +{%- endmacro %} + + + {% macro postgres__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} {%- set filtered_sql = {'sql': base_sql} -%} diff --git a/macros/materialisations/period_mat_helpers/get_period_of_load.sql b/macros/materialisations/period_mat_helpers/get_period_of_load.sql index fbf24b6d..0b20421b 100644 --- a/macros/materialisations/period_mat_helpers/get_period_of_load.sql +++ b/macros/materialisations/period_mat_helpers/get_period_of_load.sql @@ -64,6 +64,22 @@ {%- endmacro -%} +{%- macro fabric__get_period_of_load(period, offset, start_timestamp) -%} + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_timestamp_mssql = start_timestamp[0:23] %} + + {% set period_of_load_sql -%} + SELECT DATEADD({{ period }}, DATEDIFF({{period}}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) AS period_of_load + {%- endset %} + + {% set period_of_load_dict = automate_dv.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} + + {%- macro databricks__get_period_of_load(period, offset, start_timestamp) -%} {% do return(automate_dv.default__get_period_of_load(period=period, offset=offset, start_timestamp=start_timestamp)) %} {%- endmacro -%} diff --git a/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql index 8e359cd8..6aa086e3 100644 --- a/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql +++ b/macros/materialisations/period_mat_helpers/replace_placeholder_with_period_filter.sql @@ -74,6 +74,28 @@ {% endmacro %} +{% macro fabric__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} + {%- if period is in ['microsecond', 'millisecond', 'second'] -%} + {{ automate_dv.sqlserver_datepart_too_small_error(period=period) }} + {%- endif -%} + + {# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #} + {% set start_timestamp_mssql = start_timestamp[0:27] %} + + {%- set period_filter -%} + ( + CAST({{ timestamp_field }} AS DATETIME2) >= DATEADD({{ period }}, DATEDIFF({{ period }}, 0, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))), 0) + AND CAST({{ timestamp_field }} AS DATETIME2) < DATEADD({{ period }}, 1, DATEADD({{ period }}, {{ offset }}, CAST('{{ start_timestamp_mssql }}' AS DATETIME2))) + AND (CAST({{ timestamp_field }} AS DATETIME2) >= CAST('{{ start_timestamp_mssql }}' AS DATETIME2)) + ) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + {% macro postgres__replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) %} {%- set period_filter -%} diff --git a/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql b/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql index 5ede88fc..fe07e987 100644 --- a/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql +++ b/macros/materialisations/rank_mat_helpers/replace_placeholder_with_rank_filter.sql @@ -42,6 +42,20 @@ +{% macro fabric__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} + + {%- set rank_filter -%} + CAST({{ rank_column }} AS INT) = CAST({{ rank_iteration }} AS INT) + {%- endset -%} + + {%- set filtered_sql = core_sql | replace("__RANK_FILTER__", rank_filter) -%} + + {% do return(filtered_sql) %} +{% endmacro %} + + + + {% macro bigquery__replace_placeholder_with_rank_filter(core_sql, rank_column, rank_iteration) %} {%- set rank_filter -%} CAST({{ rank_column }} AS INTEGER) = CAST({{ rank_iteration }} AS INTEGER) diff --git a/macros/staging/null_columns.sql b/macros/staging/null_columns.sql index 462214f8..2a4a8513 100644 --- a/macros/staging/null_columns.sql +++ b/macros/staging/null_columns.sql @@ -80,6 +80,13 @@ {%- endmacro -%} +{%- macro fabric__null_column_sql(col_name, default_value) -%} + + {{ col_name }} AS {{ col_name ~ "_ORIGINAL" }}, + ISNULL({{ col_name }}, '{{ default_value }}') AS {{ col_name }} + +{%- endmacro -%} + {%- macro postgres__null_column_sql(col_name, default_value) -%} {{ col_name }} AS {{ col_name ~ "_ORIGINAL" }}, diff --git a/macros/supporting/casting/cast_binary.sql b/macros/supporting/casting/cast_binary.sql index c21ac5e4..4f8319ce 100644 --- a/macros/supporting/casting/cast_binary.sql +++ b/macros/supporting/casting/cast_binary.sql @@ -33,6 +33,19 @@ {%- endmacro -%} + +{%- macro fabric__cast_binary(column_str, alias=none, quote=true) -%} + + {%- if quote -%} + CONVERT({{ automate_dv.type_binary() }}, '{{ column_str }}', 2) + {%- else -%} + CONVERT({{ automate_dv.type_binary() }}, {{ column_str }}, 2) + {%- endif -%} + + {% if alias %} AS {{ alias }} {%- endif %} + +{%- endmacro -%} + {%- macro bigquery__cast_binary(column_str, alias=none, quote=true) -%} {{ automate_dv.default__cast_binary(column_str=column_str, alias=alias, quote=quote) }} diff --git a/macros/supporting/casting/cast_date.sql b/macros/supporting/casting/cast_date.sql index 929af8fe..743ab13d 100644 --- a/macros/supporting/casting/cast_date.sql +++ b/macros/supporting/casting/cast_date.sql @@ -38,6 +38,20 @@ {%- endmacro -%} +{%- macro fabric__cast_date(column_str, as_string=false, alias=none) -%} + + {%- if not as_string -%} + CONVERT(DATE, {{ column_str }}) + {%- else -%} + CONVERT(DATE, '{{ column_str }}') + {%- endif -%} + + {%- if alias %} AS {{ alias }} {%- endif %} + + +{%- endmacro -%} + + {%- macro bigquery__cast_date(column_str, as_string=false, alias=none) -%} {%- if not as_string -%} diff --git a/macros/supporting/casting/cast_datetime.sql b/macros/supporting/casting/cast_datetime.sql index 7bfe6c54..62460866 100644 --- a/macros/supporting/casting/cast_datetime.sql +++ b/macros/supporting/casting/cast_datetime.sql @@ -40,6 +40,15 @@ {%- endmacro -%} +{%- macro fabric__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%} + + CONVERT(DATETIME2, {{ column_str }}) + + {%- if alias %} AS {{ alias }} {%- endif %} + +{%- endmacro -%} + + {%- macro bigquery__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%} {%- if date_type == 'timestamp' -%} diff --git a/macros/supporting/data_types/type_string.sql b/macros/supporting/data_types/type_string.sql index 2ddd22e5..f4f73ad0 100644 --- a/macros/supporting/data_types/type_string.sql +++ b/macros/supporting/data_types/type_string.sql @@ -19,6 +19,10 @@ VARCHAR {%- endmacro -%} +{%- macro fabric__type_string(is_hash, char_length) -%} + VARCHAR +{%- endmacro -%} + {%- macro databricks__type_string(is_hash=false, char_length=255) -%} {%- if is_hash -%} {%- if var('hash', 'MD5') | lower == 'md5' -%} diff --git a/macros/supporting/data_types/type_timestamp.sql b/macros/supporting/data_types/type_timestamp.sql index f3eaadda..e3ffbf19 100644 --- a/macros/supporting/data_types/type_timestamp.sql +++ b/macros/supporting/data_types/type_timestamp.sql @@ -13,4 +13,8 @@ {%- macro sqlserver__type_timestamp() -%} DATETIME2 +{%- endmacro -%} + +{%- macro fabric__type_timestamp() -%} + DATETIME2 {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/ghost_records/binary_ghost.sql b/macros/supporting/ghost_records/binary_ghost.sql index 69ee10ce..8646536e 100644 --- a/macros/supporting/ghost_records/binary_ghost.sql +++ b/macros/supporting/ghost_records/binary_ghost.sql @@ -33,3 +33,17 @@ {%- if alias %} AS {{ alias }} {%- endif -%} {%- endmacro -%} + +{%- macro fabric__binary_ghost(alias, hash) -%} + {%- if hash | lower == 'md5' -%} + CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) + {%- elif hash | lower == 'sha' -%} + CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(32)), 32) AS BINARY(32)) + {%- elif hash | lower == 'sha1' -%} + CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(20)), 20) AS BINARY(20)) + {%- else -%} + CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) + {%- endif -%} + + {%- if alias %} AS {{ alias }} {%- endif -%} +{%- endmacro -%} diff --git a/macros/supporting/ghost_records/null_ghost.sql b/macros/supporting/ghost_records/null_ghost.sql index 924bf017..06f80c9d 100644 --- a/macros/supporting/ghost_records/null_ghost.sql +++ b/macros/supporting/ghost_records/null_ghost.sql @@ -21,4 +21,8 @@ {%- macro sqlserver__null_ghost(datatype, alias) -%} {{ automate_dv.bigquery__null_ghost(datatype, alias) }} +{%- endmacro -%} + +{%- macro fabric__null_ghost(datatype, alias) -%} + {{ automate_dv.bigquery__null_ghost(datatype, alias) }} {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index d3ccad8e..43e4b774 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -107,6 +107,13 @@ {%- endmacro -%} +{%- macro fabric__hash(columns, alias, is_hashdiff, columns_to_escape) -%} + + {{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }} + +{%- endmacro -%} + + {%- macro postgres__hash(columns, alias, is_hashdiff, columns_to_escape) -%} {{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }} diff --git a/macros/supporting/hash_components/null_expression.sql b/macros/supporting/hash_components/null_expression.sql index f6d7399e..2d8a9d76 100644 --- a/macros/supporting/hash_components/null_expression.sql +++ b/macros/supporting/hash_components/null_expression.sql @@ -45,4 +45,14 @@ {% do return(column_expression) %} +{%- endmacro -%} + +{%- macro fabric__null_expression(standardise, column_str, null_placeholder_string) -%} + + {%- set column_expression -%} + ISNULL({{ standardise | replace('[EXPRESSION]', column_str) }}, '{{ null_placeholder_string }}') + {%- endset -%} + + {% do return(column_expression) %} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash_components/select_hash_alg.sql b/macros/supporting/hash_components/select_hash_alg.sql index 2c707ae4..7f9fa099 100644 --- a/macros/supporting/hash_components/select_hash_alg.sql +++ b/macros/supporting/hash_components/select_hash_alg.sql @@ -49,6 +49,12 @@ {% endmacro %} +{% macro fabric__hash_alg_md5() -%} + + {% do return(automate_dv.cast_binary("HASHBYTES('MD5', [HASH_STRING_PLACEHOLDER])", quote=false)) %} + +{% endmacro %} + {% macro postgres__hash_alg_md5() -%} {% do return("DECODE(MD5([HASH_STRING_PLACEHOLDER]), 'hex')") %} @@ -89,6 +95,12 @@ {% endmacro %} +{% macro fabric__hash_alg_sha256() -%} + + {% do return(automate_dv.cast_binary("HASHBYTES('SHA2_256', [HASH_STRING_PLACEHOLDER])", quote=false)) %} + +{% endmacro %} + {% macro postgres__hash_alg_sha256() -%} {#- * MD5 is simple function call to md5(val) -#} {#- * SHA256 needs input cast to BYTEA and then its BYTEA result encoded as hex text output -#} @@ -131,6 +143,12 @@ {% endmacro %} +{% macro fabric__hash_alg_sha1() -%} + + {% do return(automate_dv.cast_binary("HASHBYTES('SHA1', [HASH_STRING_PLACEHOLDER])", quote=false)) %} + +{% endmacro %} + {% macro postgres__hash_alg_sha1() -%} {%- do exceptions.warn("Configured hash (SHA-1) is not supported on Postgres. diff --git a/macros/supporting/hash_components/standard_column_wrapper.sql b/macros/supporting/hash_components/standard_column_wrapper.sql index 2cc20051..d199ed6e 100644 --- a/macros/supporting/hash_components/standard_column_wrapper.sql +++ b/macros/supporting/hash_components/standard_column_wrapper.sql @@ -67,4 +67,21 @@ {% do return(standardise) -%} +{%- endmacro -%} + + +{%- macro fabric__standard_column_wrapper(hash_content_casing) -%} + + {%- if hash_content_casing == 'upper' -%} + {%- set standardise -%} + NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS {{ automate_dv.type_string() }}(MAX)))), '') + {%- endset -%} + {%- else -%} + {%- set standardise -%} + NULLIF(TRIM(CAST([EXPRESSION] AS {{ automate_dv.type_string() }}(MAX))), '') + {%- endset -%} + {%- endif -%} + + {% do return(standardise) -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/max_datetime.sql b/macros/supporting/max_datetime.sql index c039d059..94f5c9ba 100644 --- a/macros/supporting/max_datetime.sql +++ b/macros/supporting/max_datetime.sql @@ -21,6 +21,12 @@ {%- endmacro -%} +{%- macro fabric__max_datetime() -%} + + {%- do return(var('max_datetime', '9999-12-31 23:59:59.9999999')) -%} + +{%- endmacro -%} + {%- macro bigquery__max_datetime() -%} {%- do return(var('max_datetime', '9999-12-31 23:59:59.999999')) -%} From ec793eb982f98fab66f56a8618d3f2108c537778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Thal=C3=A9n?= Date: Fri, 5 Apr 2024 22:44:03 +0200 Subject: [PATCH 3/4] Add VARBINARY as binary type for Fabric --- macros/supporting/data_types/type_binary.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/supporting/data_types/type_binary.sql b/macros/supporting/data_types/type_binary.sql index e8ef30f8..cdf886fe 100644 --- a/macros/supporting/data_types/type_binary.sql +++ b/macros/supporting/data_types/type_binary.sql @@ -29,4 +29,8 @@ {%- macro databricks__type_binary() -%} STRING +{%- endmacro -%} + +{%- macro fabric__type_binary() -%} + VARBINARY {%- endmacro -%} \ No newline at end of file From 7b0351c6de17a2b580227807505df2473be168e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Thal=C3=A9n?= Date: Fri, 5 Apr 2024 23:13:05 +0200 Subject: [PATCH 4/4] Change Fabric binary casting to UNIQUEIDENTIFIER --- macros/supporting/data_types/type_binary.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/supporting/data_types/type_binary.sql b/macros/supporting/data_types/type_binary.sql index cdf886fe..6216f5a1 100644 --- a/macros/supporting/data_types/type_binary.sql +++ b/macros/supporting/data_types/type_binary.sql @@ -32,5 +32,5 @@ {%- endmacro -%} {%- macro fabric__type_binary() -%} - VARBINARY + UNIQUEIDENTIFIER {%- endmacro -%} \ No newline at end of file