From 12c20e2888461e665e3663d390c42dac449be89b Mon Sep 17 00:00:00 2001 From: chinahamu Date: Mon, 28 Nov 2022 11:27:10 +0900 Subject: [PATCH] support duckdb --- .../escape_column_names.sql | 4 + .../period_mat_helpers/get_period_of_load.sql | 15 +++ macros/staging/null_columns.sql | 9 ++ macros/supporting/cast_date.sql | 6 ++ macros/supporting/hash.sql | 88 +++++++++++++++ macros/tables/duckdb/hub.sql | 89 ++++++++++++++++ macros/tables/duckdb/link.sql | 100 ++++++++++++++++++ macros/tables/duckdb/sat.sql | 63 +++++++++++ 8 files changed, 374 insertions(+) create mode 100644 macros/tables/duckdb/hub.sql create mode 100644 macros/tables/duckdb/link.sql create mode 100644 macros/tables/duckdb/sat.sql diff --git a/macros/internal/metadata_processing/escape_column_names.sql b/macros/internal/metadata_processing/escape_column_names.sql index 6e5b130ee..11a13bb7b 100644 --- a/macros/internal/metadata_processing/escape_column_names.sql +++ b/macros/internal/metadata_processing/escape_column_names.sql @@ -132,6 +132,10 @@ {%- do return (('`', '`')) -%} {%- endmacro %} +{%- macro duckdb__get_escape_characters() %} + {%- do return (('', '')) -%} +{%- endmacro %} + {%- macro sqlserver__get_escape_characters() %} {%- do return (('"', '"')) -%} {%- endmacro %} diff --git a/macros/materialisations/period_mat_helpers/get_period_of_load.sql b/macros/materialisations/period_mat_helpers/get_period_of_load.sql index 3241ea79d..342084f60 100644 --- a/macros/materialisations/period_mat_helpers/get_period_of_load.sql +++ b/macros/materialisations/period_mat_helpers/get_period_of_load.sql @@ -73,3 +73,18 @@ {% do return(period_of_load) %} {%- endmacro -%} + +{%- macro duckdb__get_period_of_load(period, offset, start_timestamp) -%} + {# Postgres uses different DateTime arithmetic #} + {% set period_of_load_sql -%} + SELECT DATE_TRUNC('{{ period }}', + TO_TIMESTAMP('{{ start_timestamp }}', 'YYYY-MM-DD HH24:MI:SS') + interval '{{ offset }} {{ period }}' + ) AS period_of_load + {%- endset %} + + {% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} diff --git a/macros/staging/null_columns.sql b/macros/staging/null_columns.sql index 6676527bf..715fc3c49 100644 --- a/macros/staging/null_columns.sql +++ b/macros/staging/null_columns.sql @@ -86,4 +86,13 @@ {{ col_name_esc }} AS {{ col_name_orig_esc }}, COALESCE({{ col_name_esc }}, '{{ default_value }}') AS {{ col_name_esc }} +{%- endmacro -%} + +{%- macro duckdb__null_column_sql(col_name, default_value) -%} + + {%- set col_name_esc = dbtvault.escape_column_names(col_name) -%} + {%- set col_name_orig_esc = dbtvault.escape_column_names(col_name ~ "_ORIGINAL") -%} + {{ col_name_esc }} AS {{ col_name_orig_esc }}, + COALESCE({{ col_name_esc }}, '{{ default_value }}') AS {{ col_name_esc }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/cast_date.sql b/macros/supporting/cast_date.sql index 06122c53c..1f612d12a 100644 --- a/macros/supporting/cast_date.sql +++ b/macros/supporting/cast_date.sql @@ -77,4 +77,10 @@ {{ dbtvault.snowflake__cast_date(column_str=column_str, as_string=as_string, datetime=datetime, alias=alias)}} +{%- endmacro -%} + +{%- macro duckdb__cast_date(column_str, as_string=false, datetime=false, alias=none) -%} + + {{ dbtvault.snowflake__cast_date(column_str=column_str, as_string=as_string, datetime=datetime, alias=alias)}} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index b1bcf6266..e697902bb 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -303,6 +303,94 @@ {%- endmacro -%} +{%- macro duckdb__hash(columns, alias, is_hashdiff) -%} + +{%- set hash = var('hash', 'MD5') -%} +{%- set concat_string = var('concat_string', '||') -%} +{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%} + +{#- Select hashing algorithm -#} +{%- if hash == 'MD5' -%} + {%- set hash_alg = 'MD5' -%} +{%- elif hash == 'SHA' -%} + {%- set hash_alg = 'SHA256' -%} +{%- else -%} + {%- set hash_alg = 'MD5' -%} +{%- endif -%} + +{#- Select hashing expression (left and right sides) -#} +{#- * MD5 is simple function call to md5(val) -#} +{#- * SHA256 needs input cast to BYTEA and then its BYTEA result encoded as hex text output -#} +{#- e.g. ENCODE(SHA256(CAST(val AS BYTEA)), 'hex') -#} +{#- Ref: https://www.postgresql.org/docs/11/functions-binarystring.html -#} +{%- if hash_alg == 'MD5' -%} + {%- set hash_expr_left = 'MD5(' -%} + {%- set hash_expr_right = ')' -%} +{%- elif hash_alg == 'SHA256' -%} + {%- set hash_expr_left = 'ENCODE(SHA256(CAST(' -%} + {%- set hash_expr_right = " AS BYTEA)), 'hex')" -%} +{%- endif -%} + +{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" -%} + +{#- Alpha sort columns before hashing if a hashdiff -#} +{%- if is_hashdiff and dbtvault.is_list(columns) -%} + {%- set columns = columns|sort -%} +{%- endif -%} + +{#- If single column to hash -#} +{%- if columns is string -%} + {%- set column_str = dbtvault.as_constant(columns) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + + {{- "CAST(UPPER({}{}{}) AS BYTEA) AS {}".format(hash_expr_left, standardise | replace('[EXPRESSION]', escaped_column_str), hash_expr_right, dbtvault.escape_column_names(alias)) | indent(4) -}} + +{#- Else a list of columns to hash -#} +{%- else -%} + {%- set all_null = [] -%} + + {%- if is_hashdiff -%} + {{- "CAST(UPPER({}CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}} + {%- else -%} + {{- "CAST(UPPER({}NULLIF(CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}} + {%- endif -%} + + {%- for column in columns -%} + + {%- do all_null.append(null_placeholder_string) -%} + + {%- set column_str = dbtvault.as_constant(column) -%} + {%- if dbtvault.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%} + {%- endif -%} + + {{- "\nCOALESCE({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}} + {{- "," if not loop.last -}} + + {%- if loop.last -%} + + {% if is_hashdiff %} + {{- "\n){}) AS BYTEA) AS {}".format(hash_expr_right, dbtvault.escape_column_names(alias)) -}} + {%- else -%} + {{- "\n), '{}'){}) AS BYTEA) AS {}".format(all_null | join(""), hash_expr_right, dbtvault.escape_column_names(alias)) -}} + {%- endif -%} + {%- else -%} + + {%- do all_null.append(concat_string) -%} + + {%- endif -%} + {%- endfor -%} + +{%- endif -%} + +{%- endmacro -%} + {%- macro databricks__hash(columns, alias, is_hashdiff) -%} {%- set hash = var('hash', 'MD5') -%} diff --git a/macros/tables/duckdb/hub.sql b/macros/tables/duckdb/hub.sql new file mode 100644 index 000000000..3db8a157a --- /dev/null +++ b/macros/tables/duckdb/hub.sql @@ -0,0 +1,89 @@ +{%- macro duckdb__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( +{#- PostgreSQL has DISTINCT ON which should be more performant than the + strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... +-#} + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols_with_rank, 'rr') }} + {%- else %} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols, 'rr') }} + {%- endif %} + FROM {{ ref(src) }} AS rr + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + ORDER BY {{ dbtvault.prefix([src_pk], 'rr') }}, {{ dbtvault.prefix([src_ldts], 'rr') }} + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} +{%- if source_model | length > 1 %} + +row_rank_union AS ( +{#- PostgreSQL has DISTINCT ON which should be more performant than the + strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... +-#} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'ru') }}) ru.* + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + ORDER BY {{ dbtvault.prefix([src_pk], 'ru') }}, {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/duckdb/link.sql b/macros/tables/duckdb/link.sql new file mode 100644 index 000000000..f50e9fa2a --- /dev/null +++ b/macros/tables/duckdb/link.sql @@ -0,0 +1,100 @@ +{%- macro duckdb__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%} +{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + SELECT * FROM ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + {%- if source_model | length == 1 %} + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} + {%- endif %} + ) as l + WHERE row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if source_model | length > 1 %} + +row_rank_union AS ( + SELECT * FROM ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + ) AS a + WHERE row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/duckdb/sat.sql b/macros/tables/duckdb/sat.sql new file mode 100644 index 000000000..64abd3974 --- /dev/null +++ b/macros/tables/duckdb/sat.sql @@ -0,0 +1,63 @@ +{%- macro duckdb__sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +WITH source_data AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + {%- endif %} + FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} + {%- if model.config.materialized == 'vault_insert_by_period' %} + AND __PERIOD_FILTER__ + {% elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {% endif %} +), + +{%- if dbtvault.is_any_incremental() %} + +latest_records AS ( + SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} + FROM ( + SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + RANK() OVER ( + PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + ) AS rank + FROM {{ this }} AS current_records + JOIN ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + FROM source_data + ) AS source_records + ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} + ) AS a + WHERE a.rank = 1 +), + +{%- endif %} + +records_to_insert AS ( + SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN latest_records + ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%}