From d7632ebb2b36daa108e9ad2b2374d4a4c7855145 Mon Sep 17 00:00:00 2001 From: Mila Page <67295367+VersusFacit@users.noreply.github.com> Date: Thu, 26 Sep 2024 14:05:02 -0700 Subject: [PATCH] Add logic to enable iceberg incremental tables. (#1190) * Add logic to enable iceberg incremental tables. * Add changelog. * Standardize existing_relation as name of existing model. * Improve error message for table formats. * Update error message and add round of tests. * Add more comprehensive tests for before/after. * Update identifier param in incremental materialization. * Import Mike's revision on the relation type change test. * Try adding iceberg incremental model scenarios. * Disable dynamic tests. * Refine booleans for more restricting when incremental models are built. * Update tests to reflect realities on the database. * Add additional metadata. * Update boolean logic for faster runs overall. * Last bit of cleanup per code review * Syntax error from lack of iterable. --- .../unreleased/Features-20240923-203204.yaml | 6 + .../macros/materializations/incremental.sql | 23 +++- .../macros/materializations/table.sql | 8 +- .../iceberg/test_incremental_models.py | 126 ++++++++++++++++++ tests/functional/relation_tests/models.py | 34 +++++ .../test_relation_type_change.py | 75 +++++++++-- 6 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 .changes/unreleased/Features-20240923-203204.yaml create mode 100644 tests/functional/iceberg/test_incremental_models.py diff --git a/.changes/unreleased/Features-20240923-203204.yaml b/.changes/unreleased/Features-20240923-203204.yaml new file mode 100644 index 000000000..eaca4906b --- /dev/null +++ b/.changes/unreleased/Features-20240923-203204.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add Iceberg format Incremental Models +time: 2024-09-23T20:32:04.783741-07:00 +custom: + Author: versusfacit + Issue: "321" diff --git a/dbt/include/snowflake/macros/materializations/incremental.sql b/dbt/include/snowflake/macros/materializations/incremental.sql index 9172c061e..12645cd11 100644 --- a/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/dbt/include/snowflake/macros/materializations/incremental.sql @@ -58,12 +58,21 @@ {% materialization incremental, adapter='snowflake', supported_languages=['sql', 'python'] -%} - {% set original_query_tag = set_query_tag() %} {#-- Set vars --#} {%- set full_refresh_mode = (should_full_refresh()) -%} {%- set language = model['language'] -%} - {% set target_relation = this %} + + {%- set identifier = this.name -%} + + {%- set target_relation = api.Relation.create( + identifier=identifier, + schema=schema, + database=database, + type='table', + table_format=config.get('table_format', 'default') + ) -%} + {% set existing_relation = load_relation(this) %} {#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#} @@ -90,11 +99,21 @@ {%- call statement('main', language=language) -%} {{ create_table_as(False, target_relation, compiled_code, language) }} {%- endcall -%} + {% elif full_refresh_mode %} + {% if target_relation.needs_to_drop(existing_relation) %} + {{ drop_relation_if_exists(existing_relation) }} + {% endif %} {%- call statement('main', language=language) -%} {{ create_table_as(False, target_relation, compiled_code, language) }} {%- endcall -%} + {% elif target_relation.table_format != existing_relation.table_format %} + {% do exceptions.raise_compiler_error( + "Unable to alter incremental model `" ~ target_relation.identifier ~ "` to '" ~ target_relation.table_format ~ " table format due to Snowflake limitation. Please execute with --full-refresh to drop the table and recreate in new table format.'" + ) + %} + {% else %} {#-- Create the temp relation, either as a view or as a temp table --#} {% if tmp_relation_type == 'view' %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 9ee8a0b12..995757b6b 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -7,7 +7,7 @@ {% set grant_config = config.get('grants') %} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create( identifier=identifier, schema=schema, @@ -18,8 +18,8 @@ {{ run_hooks(pre_hooks) }} - {% if target_relation.needs_to_drop(old_relation) %} - {{ drop_relation_if_exists(old_relation) }} + {% if target_relation.needs_to_drop(existing_relation) %} + {{ drop_relation_if_exists(existing_relation) }} {% endif %} {% call statement('main', language=language) -%} @@ -28,7 +28,7 @@ {{ run_hooks(post_hooks) }} - {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do persist_docs(target_relation, model) %} diff --git a/tests/functional/iceberg/test_incremental_models.py b/tests/functional/iceberg/test_incremental_models.py new file mode 100644 index 000000000..a02d9ffed --- /dev/null +++ b/tests/functional/iceberg/test_incremental_models.py @@ -0,0 +1,126 @@ +import pytest + +from pathlib import Path + +from dbt.tests.util import run_dbt, run_dbt_and_capture + + +_SEED_INCREMENTAL_STRATEGIES = """ +world_id,world_name,boss +1,Yoshi's Island,Iggy +2,Donut Plains,Morton +3,Vanilla Dome,Lemmy +4,Cookie Mountain,Temmy +5,Forest of Illusion,Roy +""".strip() + +_MODEL_BASIC_TABLE_MODEL = """ +{{ + config( + materialized = "table", + ) +}} +select * from {{ ref('seed') }} +""" + +_MODEL_INCREMENTAL_ICEBERG_BASE = """ +{{{{ + config( + materialized='incremental', + table_format='iceberg', + incremental_strategy='{strategy}', + unique_key="world_id", + external_volume = "s3_iceberg_snow", + ) +}}}} +select * from {{{{ ref('upstream_table') }}}} + +{{% if is_incremental() %}} +where world_id > 2 +{{% endif %}} +""" + +_MODEL_INCREMENTAL_ICEBERG_APPEND = _MODEL_INCREMENTAL_ICEBERG_BASE.format(strategy="append") +_MODEL_INCREMENTAL_ICEBERG_MERGE = _MODEL_INCREMENTAL_ICEBERG_BASE.format(strategy="merge") +_MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT = _MODEL_INCREMENTAL_ICEBERG_BASE.format( + strategy="delete+insert" +) + + +_QUERY_UPDATE_UPSTREAM_TABLE = """ +UPDATE {database}.{schema}.upstream_table set world_name = 'Twin Bridges', boss = 'Ludwig' where world_id = 4; +""" + +_QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT = """ +UPDATE {database}.{schema}.upstream_table set world_name = 'Doughnut Plains' where world_id = 2; +""" + + +class TestIcebergIncrementalStrategies: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": _SEED_INCREMENTAL_STRATEGIES, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + yield + + @pytest.fixture(scope="class") + def models(self): + return { + "upstream_table.sql": _MODEL_BASIC_TABLE_MODEL, + "append.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND, + "merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE, + "delete_insert.sql": _MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT, + } + + def test_incremental_strategies_build(self, project, setup_class): + run_results = run_dbt() + assert len(run_results) == 4 + + def __check_correct_operations(self, model_name, /, rows_affected, status="SUCCESS"): + run_results = run_dbt( + ["show", "--inline", f"select * from {{{{ ref('{model_name}') }}}} where world_id = 4"] + ) + assert run_results[0].adapter_response["rows_affected"] == rows_affected + assert run_results[0].adapter_response["code"] == status + + if model_name != "append": + run_results, stdout = run_dbt_and_capture( + [ + "show", + "--inline", + f"select * from {{{{ ref('{model_name}') }}}} where world_id = 2", + ] + ) + run_results[0].adapter_response["rows_affected"] == 1 + assert "Doughnut" not in stdout + + def test_incremental_strategies_with_update(self, project, setup_class): + run_results = run_dbt() + assert len(run_results) == 4 + + project.run_sql( + _QUERY_UPDATE_UPSTREAM_TABLE.format( + database=project.database, schema=project.test_schema + ) + ) + project.run_sql( + _QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT.format( + database=project.database, schema=project.test_schema + ) + ) + + run_results = run_dbt(["run", "-s", "append", "merge", "delete_insert"]) + assert len(run_results) == 3 + + self.__check_correct_operations("append", rows_affected=3) + self.__check_correct_operations("merge", rows_affected=1) + self.__check_correct_operations("delete_insert", rows_affected=1) diff --git a/tests/functional/relation_tests/models.py b/tests/functional/relation_tests/models.py index 6fe066313..63dfff045 100644 --- a/tests/functional/relation_tests/models.py +++ b/tests/functional/relation_tests/models.py @@ -31,3 +31,37 @@ ) }} select * from {{ ref('my_seed') }} """ + + +DYNAMIC_ICEBERG_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='1 minute', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" + +ICEBERG_TABLE = """ +{{ config( + materialized='table', + table_format="iceberg", + external_volume="s3_iceberg_snow", +) }} +select * from {{ ref('my_seed') }} +""" + +ICEBERG_INCREMENTAL_TABLE = """ +{{ config( + materialized='incremental', + table_format='iceberg', + incremental_strategy='append', + unique_key="id", + external_volume = "s3_iceberg_snow", +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index 1246b0791..c2886ad04 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -1,21 +1,27 @@ from dataclasses import dataclass from itertools import product +from typing import Optional from dbt.tests.util import run_dbt import pytest from tests.functional.relation_tests import models -from tests.functional.utils import query_relation_type, update_model +from tests.functional.utils import describe_dynamic_table, query_relation_type, update_model @dataclass class Model: model: str relation_type: str + table_format: Optional[str] = None + incremental: Optional[bool] = None @property def name(self) -> str: - return f"{self.relation_type}" + name = f"{self.relation_type}" + if self.table_format: + name += f"_{self.table_format}" + return name @dataclass @@ -34,31 +40,84 @@ def error_message(self) -> str: relations = [ Model(models.VIEW, "view"), - Model(models.TABLE, "table"), - Model(models.DYNAMIC_TABLE, "dynamic_table"), + Model(models.TABLE, "table", "default"), + # to be activated upon merge of dynamic table support PR + # Model(models.DYNAMIC_TABLE, "dynamic_table", "default"), + # Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), + Model(models.ICEBERG_TABLE, "table", "iceberg"), + Model(models.ICEBERG_INCREMENTAL_TABLE, "table", "iceberg", incremental=True), ] scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] class TestRelationTypeChange: + @staticmethod + def include(scenario) -> bool: + return ( + scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg" + ) + @pytest.fixture(scope="class", autouse=True) def seeds(self): return {"my_seed.csv": models.SEED} @pytest.fixture(scope="class", autouse=True) def models(self): - yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios} + yield { + f"{scenario.name}.sql": scenario.initial.model + for scenario in scenarios + if self.include(scenario) + } @pytest.fixture(scope="class", autouse=True) def setup(self, project): run_dbt(["seed"]) run_dbt(["run"]) for scenario in scenarios: - update_model(project, scenario.name, scenario.final.model) + if self.include(scenario): + update_model(project, scenario.name, scenario.final.model) run_dbt(["run"]) @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) def test_replace(self, project, scenario): - relation_type = query_relation_type(project, scenario.name) - assert relation_type == scenario.final.relation_type, scenario.error_message + if self.include(scenario): + relation_type = query_relation_type(project, scenario.name) + assert relation_type == scenario.final.relation_type, scenario.error_message + if relation_type == "dynamic_table": + dynamic_table = describe_dynamic_table(project, scenario.name) + assert dynamic_table.catalog.table_format == scenario.final.table_format + else: + pytest.skip() + + +""" +Upon adding the logic needed for seamless transitions to and from incremental models without data loss, we can coalesce these test cases. +""" + + +class TestRelationTypeChangeIcebergOn(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @staticmethod + def include(scenario) -> bool: + return any( + ( + # scenario 1: Everything that doesn't include incremental relations on Iceberg + ( + ( + scenario.initial.table_format == "iceberg" + or scenario.final.table_format == "iceberg" + ) + and not scenario.initial.incremental + and not scenario.final.incremental + ), + # scenario 2: Iceberg Incremental swaps allowed + ( + scenario.initial.table_format == "iceberg" + and scenario.final.table_format == "iceberg" + ), + ) + )