Skip to content

Commit

Permalink
Ensure new 'deletion' records get a new scd_id
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb committed Dec 19, 2024
1 parent 2163952 commit e0a786b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dbt.tests.util import check_relations_equal, run_dbt

# Snapshot source data for the tests in this file
_seed_new_record_mode = """
BEGIN
Expand Down Expand Up @@ -176,8 +177,14 @@
where id >= 10 and id <= 20;
"""

# SQL to delete a record from the snapshot source data
_delete_sql = """
delete from {schema}.seed where id = 1
delete from {database}.{schema}.seed where id = 1
"""

# If the deletion worked correctly, this should return two rows, with one of them representing the deletion.
_delete_check_sql = """
select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1
"""


Expand Down Expand Up @@ -229,4 +236,15 @@ def test_snapshot_new_record_mode(
results = run_dbt(["snapshot"])
assert len(results) == 1

# TODO: Further validate results.
check_result = project.run_sql(_delete_check_sql, fetch="all")
valid_to = 0
scd_id = 1
is_deleted = 2
assert len(check_result) == 2
assert sum(
[1 for c in check_result if c[valid_to] is None and c[scd_id] is not None and c[is_deleted] == "True"]
) == 1
assert sum(
[1 for c in check_result if c[valid_to] is not None and c[scd_id] is not None and c[is_deleted] == "False"]
) == 1
assert check_result[0][scd_id] != check_result[1][scd_id]
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
with snapshot_query as (

{{ source_sql }}
Expand Down Expand Up @@ -169,12 +169,13 @@
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}

)
{%- endif %}

Expand Down Expand Up @@ -272,6 +273,17 @@
{% endif %}
{% endmacro %}

{% macro unique_key_reverse(unique_key) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
dbt_unique_key_{{ loop.index }} as {{ key }}
{%- if not loop.last %} , {%- endif %}
{% endfor %}
{% else %}
dbt_unique_key as {{ unique_key }}
{% endif %}
{% endmacro %}


{% macro unique_key_join_on(unique_key, identifier, from_identifier) %}
{% if unique_key | is_list %}
Expand Down

0 comments on commit e0a786b

Please sign in to comment.