Skip to content

Commit

Permalink
Merge branch 'release/0.7.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Higgs committed Jun 10, 2021
2 parents d15aeab + 6ac0c90 commit a2e5780
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 115 deletions.
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ workflows:
only:
- develop
- /^int.*/
- /^fix.*/
test-integration:
jobs:
- integration:
filters:
branches:
only:
- develop
- /^int.*/
- /^int.*/
- /^fix.*/
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'dbtvault'
version: '0.7.4'
version: '0.7.5'
require-dbt-version: [">=0.18.0", "<0.20.0"]
config-version: 2

Expand Down
4 changes: 3 additions & 1 deletion macros/internal/multikey.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

{%- if condition in ['<>', '!=', '='] -%}
{%- for col in columns -%}
{{ (prefix[0] ~ '.') if prefix }}{{ col }} {{ condition }} {{ (prefix[1] ~ '.') if prefix }}{{ col }}
{%- if prefix -%}
{{- dbtvault.prefix([col], prefix[0], alias_target='target') }} {{ condition }} {{ dbtvault.prefix([col], prefix[1]) -}}
{%- endif %}
{%- if not loop.last %} {{ operator }} {% endif %}
{% endfor -%}
{%- else -%}
Expand Down
15 changes: 12 additions & 3 deletions macros/materialisations/shared_helpers.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %}
{%- macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') -%}

{%- if model_sql.find(placeholder) == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{% endmacro %}
{%- endmacro -%}


{%- macro is_any_incremental() -%}
{%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() -%}
{%- do return(true) -%}
{%- else -%}
{%- do return(false) -%}
{%- endif -%}
{%- endmacro -%}
14 changes: 13 additions & 1 deletion macros/staging/rank_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@

{%- if columns[col] is mapping and columns[col].partition_by and columns[col].order_by -%}

{{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(columns[col].partition_by, columns[col].order_by, col) | indent(4) -}}
{%- if dbtvault.is_list(columns[col].order_by) -%}
{%- set order_by_str = columns[col].order_by | join(", ") -%}
{%- else -%}
{%- set order_by_str = columns[col].order_by -%}
{%- endif -%}

{%- if dbtvault.is_list(columns[col].partition_by) -%}
{%- set partition_by_str = columns[col].partition_by | join(", ") -%}
{%- else -%}
{%- set partition_by_str = columns[col].partition_by -%}
{%- endif -%}

{{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(partition_by_str, order_by_str, col) | indent(4) -}}

{%- endif -%}

Expand Down
143 changes: 74 additions & 69 deletions macros/tables/eff_sat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,104 +21,109 @@
{{- dbtvault.prepend_generated_by() }}

WITH source_data AS (
SELECT *
FROM {{ ref(source_model) }}
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }}
FROM {{ ref(source_model) }} AS a
WHERE {{ dbtvault.multikey(src_dfk, prefix='a', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_sfk, prefix='a', condition='IS NOT NULL') }}
{%- if model.config.materialized == 'vault_insert_by_period' %}
WHERE __PERIOD_FILTER__
{% endif %}
{%- set source_cte = "source_data" %}
),

{%- if model.config.materialized == 'vault_insert_by_rank' %}
rank_col AS (
SELECT * FROM source_data
WHERE __RANK_FILTER__
{%- set source_cte = "rank_col" %}
AND __PERIOD_FILTER__
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
AND __RANK_FILTER__
{%- endif %}
),
{% endif -%}

{%- if load_relation(this) is none %}
{%- if dbtvault.is_any_incremental() %}

records_to_insert AS (
SELECT {{ dbtvault.alias_all(source_cols, 'e') }}
FROM {{ source_cte }} AS e
)
{%- else %}

latest_open_eff AS
(
{# Selecting the most recent records for each link hashkey -#}
latest_records AS (
SELECT {{ dbtvault.alias_all(source_cols, 'b') }},
ROW_NUMBER() OVER (
PARTITION BY
{%- for driving_key in dfk_cols %}
{{ driving_key }}{{ ", " if not loop.last }}
{%- endfor %}
PARTITION BY b.{{ src_pk }}
ORDER BY b.{{ src_ldts }} DESC
) AS row_number
) AS row_num
FROM {{ this }} AS b
WHERE TO_DATE(b.{{ src_end_date }}) = TO_DATE('9999-12-31')
QUALIFY row_number = 1
QUALIFY row_num = 1
),

stage_slice AS
(
SELECT {{ dbtvault.alias_all(source_cols, 'stage') }}
FROM {{ "rank_col" if model.config.materialized == 'vault_insert_by_rank' else "source_data" }} AS stage
{# Selecting the open records of the most recent records for each link hashkey -#}
latest_open AS (
SELECT {{ dbtvault.alias_all(source_cols, 'c') }}
FROM latest_records AS c
WHERE TO_DATE(c.{{ src_end_date }}) = TO_DATE('9999-12-31')
),

new_open_records AS (
SELECT DISTINCT
{{ dbtvault.alias_all(source_cols, 'stage') }}
FROM stage_slice AS stage
LEFT JOIN latest_open_eff AS e
ON stage.{{ src_pk }} = e.{{ src_pk }}
WHERE e.{{ src_pk }} IS NULL
AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }}
{# Selecting the closed records of the most recent records for each link hashkey -#}
latest_closed AS (
SELECT {{ dbtvault.alias_all(source_cols, 'd') }}
FROM latest_records AS d
WHERE TO_DATE(d.{{ src_end_date }}) != TO_DATE('9999-12-31')
),
{%- if is_auto_end_dating %}

links_to_end_date AS (
SELECT a.*
FROM latest_open_eff AS a
LEFT JOIN stage_slice AS b
ON {{ dbtvault.multikey(src_dfk, prefix=['a', 'b'], condition='=') }}
WHERE {{ dbtvault.multikey(src_sfk, prefix='b', condition='IS NULL', operator='OR') }}
OR {{ dbtvault.multikey(src_sfk, prefix=['a', 'b'], condition='<>', operator='OR') }}
{# Identifying the completely new link relationships to be opened in eff sat -#}
new_open_records AS (
SELECT DISTINCT
{{ dbtvault.alias_all(source_cols, 'f') }}
FROM source_data AS f
LEFT JOIN latest_records AS lr
ON f.{{ src_pk }} = lr.{{ src_pk }}
WHERE lr.{{ src_pk }} IS NULL
),

new_end_dated_records AS (
{# Identifying the currently closed link relationships to be reopened in eff sat -#}
new_reopened_records AS (
SELECT DISTINCT
h.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'g') }},
h.EFFECTIVE_FROM AS {{ src_start_date }}, h.{{ src_source }}
FROM latest_open_eff AS h
INNER JOIN links_to_end_date AS g
ON g.{{ src_pk }} = h.{{ src_pk }}
lc.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'lc') }},
lc.{{ src_start_date }} AS {{ src_start_date }},
g.{{ src_end_date }} AS {{ src_end_date }},
g.{{ src_eff }} AS {{ src_eff }},
g.{{ src_ldts }},
g.{{ src_source }}
FROM source_data AS g
INNER JOIN latest_closed lc
ON g.{{ src_pk }} = lc.{{ src_pk }}
),

amended_end_dated_records AS (
{%- if is_auto_end_dating %}

{# Creating the closing records -#}
{# Identifying the currently open relationships that need to be closed due to change in SFK(s) -#}
new_closed_records AS (
SELECT DISTINCT
a.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'a') }},
a.{{ src_start_date }},
stage.{{ src_eff }} AS END_DATE, stage.{{ src_eff }}, stage.{{ src_ldts }},
a.{{ src_source }}
FROM new_end_dated_records AS a
INNER JOIN stage_slice AS stage
ON {{ dbtvault.multikey(src_dfk, prefix=['stage', 'a'], condition='=') }}
WHERE {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }}
lo.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'lo') }},
lo.{{ src_start_date }} AS {{ src_start_date }},
h.{{ src_eff }} AS {{ src_end_date }},
h.{{ src_eff }} AS {{ src_eff }},
h.{{ src_ldts }},
lo.{{ src_source }}
FROM source_data AS h
INNER JOIN latest_open AS lo
ON {{ dbtvault.multikey(src_dfk, prefix=['lo', 'h'], condition='=') }}
WHERE ({{ dbtvault.multikey(src_sfk, prefix=['lo', 'h'], condition='<>', operator='OR') }})
),

{#- if is_auto_end_dating -#}
{%- endif %}

records_to_insert AS (
SELECT * FROM new_open_records
UNION
SELECT * FROM new_reopened_records
{%- if is_auto_end_dating %}
UNION
SELECT * FROM amended_end_dated_records
SELECT * FROM new_closed_records
{%- endif %}
)

{%- else %}

records_to_insert AS (
SELECT {{ dbtvault.alias_all(source_cols, 'i') }}
FROM source_data AS i
)

{#- if not dbtvault.is_any_incremental() -#}
{%- endif %}

SELECT * FROM records_to_insert
Expand Down
2 changes: 1 addition & 1 deletion macros/tables/hub.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ row_rank_union AS (
records_to_insert AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if dbtvault.is_vault_insert_by_period() or is_incremental() %}
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON a.{{ src_pk }} = d.{{ src_pk }}
WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL
Expand Down
4 changes: 2 additions & 2 deletions macros/tables/link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ row_rank_{{ source_number }} AS (
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ src_pk }}
ORDER BY {{ src_ldts }} ASC
ORDER BY {{ src_ldts }}
) AS row_number
FROM {{ ref(src) }}
{%- if source_model | length == 1 %}
Expand Down Expand Up @@ -96,7 +96,7 @@ row_rank_union AS (
records_to_insert AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if dbtvault.is_vault_insert_by_period() or is_incremental() %}
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON a.{{ src_pk }} = d.{{ src_pk }}
WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL
Expand Down
Loading

0 comments on commit a2e5780

Please sign in to comment.