Skip to content

Commit

Permalink
Adding missing 1.9 Snapshot behavior (#904)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Jan 17, 2025
1 parent 3fb54dc commit 67bf98d
Show file tree
Hide file tree
Showing 4 changed files with 459 additions and 52 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.9.2 (TBD)

### Features

- Update snapshot materialization to support new snapshot features ([904](https://github.com/databricks/dbt-databricks/pull/904))

### Under the Hood

- Refactor global state reading ([888](https://github.com/databricks/dbt-databricks/pull/888))
Expand Down
88 changes: 36 additions & 52 deletions dbt/include/databricks/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
{% macro databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
create or replace view {{ tmp_relation }}
as
{{ select }}
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% materialization snapshot, adapter='databricks' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
Expand Down Expand Up @@ -62,47 +39,43 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set build_or_select_sql = build_sql %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model, for_relation=False) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{% if target_relation.database is none %}
{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% else %}
{% set staging_table = databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% endif %}
{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand All @@ -117,23 +90,34 @@
)
%}

{% call statement_with_staging_table('main', staging_table) %}
{{ final_sql }}
{% endcall %}
{% endif %}

{% do persist_docs(target_relation, model, for_relation=True) %}

{% endif %}
{{ check_time_data_types(build_or_select_sql) }}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_constraints(target_relation, model) %}
{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{% do persist_constraints(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
Expand Down
74 changes: 74 additions & 0 deletions tests/functional/adapter/simple_snapshot/test_new_record_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest

from dbt.tests.adapter.simple_snapshot.new_record_mode import (
_delete_sql,
_invalidate_sql,
_ref_snapshot_sql,
_seed_new_record_mode,
_snapshot_actual_sql,
_snapshots_yml,
_update_sql,
)
from dbt.tests.util import check_relations_equal, run_dbt


class TestDatabricksSnapshotNewRecordMode:
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": _snapshot_actual_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"snapshots.yml": _snapshots_yml,
"ref_snapshot.sql": _ref_snapshot_sql,
}

@pytest.fixture(scope="class")
def seed_new_record_mode(self):
return _seed_new_record_mode

@pytest.fixture(scope="class")
def invalidate_sql_1(self):
return _invalidate_sql.split(";", 1)[0].replace("BEGIN", "")

@pytest.fixture(scope="class")
def invalidate_sql_2(self):
return _invalidate_sql.split(";", 1)[1].replace("END", "").replace(";", "")

@pytest.fixture(scope="class")
def update_sql(self):
return _update_sql.replace("text", "string")

@pytest.fixture(scope="class")
def delete_sql(self):
return _delete_sql

def test_snapshot_new_record_mode(
self, project, seed_new_record_mode, invalidate_sql_1, invalidate_sql_2, update_sql
):
for sql in (
seed_new_record_mode.replace("text", "string")
.replace("TEXT", "STRING")
.replace("BEGIN", "")
.replace("END;", "")
.replace(" WITHOUT TIME ZONE", "")
.split(";")
):
project.run_sql(sql)
results = run_dbt(["snapshot"])
assert len(results) == 1

project.run_sql(invalidate_sql_1)
project.run_sql(invalidate_sql_2)
project.run_sql(update_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"])

project.run_sql(_delete_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1
Loading

0 comments on commit 67bf98d

Please sign in to comment.