Skip to content

Commit

Permalink
Add logic to enable iceberg incremental tables. (#1190)
Browse files Browse the repository at this point in the history
* Add logic to enable iceberg incremental tables.

* Add changelog.

* Standardize existing_relation as name of existing model.

* Improve error message for table formats.

* Update error message and add round of tests.

* Add more comprehensive tests for before/after.

* Update identifier param in incremental materialization.

* Import Mike's revision on the relation type change test.

* Try adding iceberg incremental model scenarios.

* Disable dynamic tests.

* Refine booleans for more restricting when incremental models are built.

* Update tests to reflect realities on the database.

* Add additional metadata.

* Update boolean logic for faster runs overall.

* Last bit of cleanup per code review

* Syntax error from lack of iterable.
  • Loading branch information
VersusFacit authored Sep 26, 2024
1 parent 583ec5e commit d7632eb
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 14 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240923-203204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add Iceberg format Incremental Models
time: 2024-09-23T20:32:04.783741-07:00
custom:
Author: versusfacit
Issue: "321"
23 changes: 21 additions & 2 deletions dbt/include/snowflake/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,21 @@

{% materialization incremental, adapter='snowflake', supported_languages=['sql', 'python'] -%}

{% set original_query_tag = set_query_tag() %}

{#-- Set vars --#}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set language = model['language'] -%}
{% set target_relation = this %}

{%- set identifier = this.name -%}

{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table',
table_format=config.get('table_format', 'default')
) -%}

{% set existing_relation = load_relation(this) %}

{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
Expand All @@ -90,11 +99,21 @@
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif full_refresh_mode %}
{% if target_relation.needs_to_drop(existing_relation) %}
{{ drop_relation_if_exists(existing_relation) }}
{% endif %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif target_relation.table_format != existing_relation.table_format %}
{% do exceptions.raise_compiler_error(
"Unable to alter incremental model `" ~ target_relation.identifier ~ "` to '" ~ target_relation.table_format ~ " table format due to Snowflake limitation. Please execute with --full-refresh to drop the table and recreate in new table format.'"
)
%}

{% else %}
{#-- Create the temp relation, either as a view or as a temp table --#}
{% if tmp_relation_type == 'view' %}
Expand Down
8 changes: 4 additions & 4 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
Expand All @@ -18,8 +18,8 @@

{{ run_hooks(pre_hooks) }}

{% if target_relation.needs_to_drop(old_relation) %}
{{ drop_relation_if_exists(old_relation) }}
{% if target_relation.needs_to_drop(existing_relation) %}
{{ drop_relation_if_exists(existing_relation) }}
{% endif %}

{% call statement('main', language=language) -%}
Expand All @@ -28,7 +28,7 @@

{{ run_hooks(post_hooks) }}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
Expand Down
126 changes: 126 additions & 0 deletions tests/functional/iceberg/test_incremental_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest

from pathlib import Path

from dbt.tests.util import run_dbt, run_dbt_and_capture


_SEED_INCREMENTAL_STRATEGIES = """
world_id,world_name,boss
1,Yoshi's Island,Iggy
2,Donut Plains,Morton
3,Vanilla Dome,Lemmy
4,Cookie Mountain,Temmy
5,Forest of Illusion,Roy
""".strip()

_MODEL_BASIC_TABLE_MODEL = """
{{
config(
materialized = "table",
)
}}
select * from {{ ref('seed') }}
"""

_MODEL_INCREMENTAL_ICEBERG_BASE = """
{{{{
config(
materialized='incremental',
table_format='iceberg',
incremental_strategy='{strategy}',
unique_key="world_id",
external_volume = "s3_iceberg_snow",
)
}}}}
select * from {{{{ ref('upstream_table') }}}}
{{% if is_incremental() %}}
where world_id > 2
{{% endif %}}
"""

_MODEL_INCREMENTAL_ICEBERG_APPEND = _MODEL_INCREMENTAL_ICEBERG_BASE.format(strategy="append")
_MODEL_INCREMENTAL_ICEBERG_MERGE = _MODEL_INCREMENTAL_ICEBERG_BASE.format(strategy="merge")
_MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT = _MODEL_INCREMENTAL_ICEBERG_BASE.format(
strategy="delete+insert"
)


_QUERY_UPDATE_UPSTREAM_TABLE = """
UPDATE {database}.{schema}.upstream_table set world_name = 'Twin Bridges', boss = 'Ludwig' where world_id = 4;
"""

_QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT = """
UPDATE {database}.{schema}.upstream_table set world_name = 'Doughnut Plains' where world_id = 2;
"""


class TestIcebergIncrementalStrategies:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": _SEED_INCREMENTAL_STRATEGIES,
}

@pytest.fixture(scope="function", autouse=True)
def setup_class(self, project):
run_dbt(["seed"])
yield

@pytest.fixture(scope="class")
def models(self):
return {
"upstream_table.sql": _MODEL_BASIC_TABLE_MODEL,
"append.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND,
"merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE,
"delete_insert.sql": _MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT,
}

def test_incremental_strategies_build(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4

def __check_correct_operations(self, model_name, /, rows_affected, status="SUCCESS"):
run_results = run_dbt(
["show", "--inline", f"select * from {{{{ ref('{model_name}') }}}} where world_id = 4"]
)
assert run_results[0].adapter_response["rows_affected"] == rows_affected
assert run_results[0].adapter_response["code"] == status

if model_name != "append":
run_results, stdout = run_dbt_and_capture(
[
"show",
"--inline",
f"select * from {{{{ ref('{model_name}') }}}} where world_id = 2",
]
)
run_results[0].adapter_response["rows_affected"] == 1
assert "Doughnut" not in stdout

def test_incremental_strategies_with_update(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4

project.run_sql(
_QUERY_UPDATE_UPSTREAM_TABLE.format(
database=project.database, schema=project.test_schema
)
)
project.run_sql(
_QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT.format(
database=project.database, schema=project.test_schema
)
)

run_results = run_dbt(["run", "-s", "append", "merge", "delete_insert"])
assert len(run_results) == 3

self.__check_correct_operations("append", rows_affected=3)
self.__check_correct_operations("merge", rows_affected=1)
self.__check_correct_operations("delete_insert", rows_affected=1)
34 changes: 34 additions & 0 deletions tests/functional/relation_tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,37 @@
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_ICEBERG_TABLE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='1 minute',
refresh_mode='INCREMENTAL',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""

ICEBERG_TABLE = """
{{ config(
materialized='table',
table_format="iceberg",
external_volume="s3_iceberg_snow",
) }}
select * from {{ ref('my_seed') }}
"""

ICEBERG_INCREMENTAL_TABLE = """
{{ config(
materialized='incremental',
table_format='iceberg',
incremental_strategy='append',
unique_key="id",
external_volume = "s3_iceberg_snow",
) }}
select * from {{ ref('my_seed') }}
"""
75 changes: 67 additions & 8 deletions tests/functional/relation_tests/test_relation_type_change.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
from dataclasses import dataclass
from itertools import product
from typing import Optional

from dbt.tests.util import run_dbt
import pytest

from tests.functional.relation_tests import models
from tests.functional.utils import query_relation_type, update_model
from tests.functional.utils import describe_dynamic_table, query_relation_type, update_model


@dataclass
class Model:
model: str
relation_type: str
table_format: Optional[str] = None
incremental: Optional[bool] = None

@property
def name(self) -> str:
return f"{self.relation_type}"
name = f"{self.relation_type}"
if self.table_format:
name += f"_{self.table_format}"
return name


@dataclass
Expand All @@ -34,31 +40,84 @@ def error_message(self) -> str:

relations = [
Model(models.VIEW, "view"),
Model(models.TABLE, "table"),
Model(models.DYNAMIC_TABLE, "dynamic_table"),
Model(models.TABLE, "table", "default"),
# to be activated upon merge of dynamic table support PR
# Model(models.DYNAMIC_TABLE, "dynamic_table", "default"),
# Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"),
Model(models.ICEBERG_TABLE, "table", "iceberg"),
Model(models.ICEBERG_INCREMENTAL_TABLE, "table", "iceberg", incremental=True),
]
scenarios = [Scenario(*scenario) for scenario in product(relations, relations)]


class TestRelationTypeChange:

@staticmethod
def include(scenario) -> bool:
return (
scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg"
)

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios}
yield {
f"{scenario.name}.sql": scenario.initial.model
for scenario in scenarios
if self.include(scenario)
}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])
for scenario in scenarios:
update_model(project, scenario.name, scenario.final.model)
if self.include(scenario):
update_model(project, scenario.name, scenario.final.model)
run_dbt(["run"])

@pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios])
def test_replace(self, project, scenario):
relation_type = query_relation_type(project, scenario.name)
assert relation_type == scenario.final.relation_type, scenario.error_message
if self.include(scenario):
relation_type = query_relation_type(project, scenario.name)
assert relation_type == scenario.final.relation_type, scenario.error_message
if relation_type == "dynamic_table":
dynamic_table = describe_dynamic_table(project, scenario.name)
assert dynamic_table.catalog.table_format == scenario.final.table_format
else:
pytest.skip()


"""
Upon adding the logic needed for seamless transitions to and from incremental models without data loss, we can coalesce these test cases.
"""


class TestRelationTypeChangeIcebergOn(TestRelationTypeChange):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}

@staticmethod
def include(scenario) -> bool:
return any(
(
# scenario 1: Everything that doesn't include incremental relations on Iceberg
(
(
scenario.initial.table_format == "iceberg"
or scenario.final.table_format == "iceberg"
)
and not scenario.initial.incremental
and not scenario.final.incremental
),
# scenario 2: Iceberg Incremental swaps allowed
(
scenario.initial.table_format == "iceberg"
and scenario.final.table_format == "iceberg"
),
)
)

0 comments on commit d7632eb

Please sign in to comment.