From 9111c25aa60d977b188a40bd6051e4c908b6f48b Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Wed, 7 Jun 2023 20:32:57 -0400 Subject: [PATCH] ADAP-542: Add configuration options for dynamic tables (#636) * add new config enums * added refresh strategy query and relation method to determine differences * added dynamic table ddl * tests mostly pass, fail due to dynamic table being unavailable * updated with materialized views pushed to main, added retry to get_row_count for dynamic table initialization --- .../snowflake/relation_configs/__init__.py | 0 .../relation_configs/dynamic_table.py | 26 ++++ .../snowflake/relation_configs/lag.py | 28 ++++ dbt/include/snowflake/macros/adapters.sql | 2 +- .../materializations/dynamic_table/ddl.sql | 18 ++- .../dynamic_table/materialization.sql | 48 +++---- dev-requirements.txt | 4 +- .../adapter/dynamic_table_tests/fixtures.py | 61 +++++++- .../dynamic_table_tests/test_dynamic_table.py | 131 ++++++++++++++---- 9 files changed, 254 insertions(+), 64 deletions(-) create mode 100644 dbt/adapters/snowflake/relation_configs/__init__.py create mode 100644 dbt/adapters/snowflake/relation_configs/dynamic_table.py create mode 100644 dbt/adapters/snowflake/relation_configs/lag.py diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py new file mode 100644 index 000000000..b3e7e945e --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass + +from dbt.adapters.relation_configs import RelationConfigBase + +from dbt.adapters.snowflake.relation_configs.lag import SnowflakeDynamicTableLagConfig + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableConfig(RelationConfigBase): + """ + This config follow the specs found here: + https://docs.snowflake.com/en/LIMITEDACCESS/create-dynamic-table + + The following parameters are configurable by dbt: + - name: name of the dynamic table + - query: the query behind the table + - lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables + - warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table + + There are currently no non-configurable parameters. + """ + + name: str + query: str + lag: SnowflakeDynamicTableLagConfig + warehouse: str diff --git a/dbt/adapters/snowflake/relation_configs/lag.py b/dbt/adapters/snowflake/relation_configs/lag.py new file mode 100644 index 000000000..9daea3f93 --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/lag.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass + +from dbt.adapters.relation_configs import RelationConfigBase +from dbt.dataclass_schema import StrEnum + + +class SnowflakeDynamicTableLagPeriod(StrEnum): + seconds = "seconds" + minutes = "minutes" + hours = "hours" + days = "days" + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableLagConfig(RelationConfigBase): + """ + This config follow the specs found here: + https://docs.snowflake.com/en/LIMITEDACCESS/create-dynamic-table + + The following parameters are configurable by dbt: + - duration: the numeric part of the lag + - period: the scale part of the lag + + There are currently no non-configurable parameters. + """ + + duration: int + period: SnowflakeDynamicTableLagPeriod diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 55362fe75..6310f2fbf 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -377,7 +377,7 @@ {% macro snowflake__drop_relation(relation) -%} {%- if relation.is_dynamic_table -%} {% call statement('drop_relation', auto_begin=False) -%} - {{ drop_view(relation) }} + drop dynamic table if exists {{ relation }} {%- endcall %} {%- else -%} {{- default__drop_relation(relation) -}} diff --git a/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql b/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql index 835076c5e..3ba59002a 100644 --- a/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql +++ b/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql @@ -13,24 +13,34 @@ {% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%} {{- log('Applying CREATE to: ' ~ relation) -}} - {{- get_create_view_as_sql(relation, sql) -}} + + create or replace dynamic table {{ relation }} + lag = '{{ config.get("lag") }}' + warehouse = {{ config.get("warehouse") }} + as ({{ sql }}) + {%- endmacro %} {% macro snowflake__get_replace_dynamic_table_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) -%} {{- log('Applying REPLACE to: ' ~ relation) -}} - {{ drop_relation(existing_relation) }} + {{ snowflake__get_drop_dynamic_table_sql(existing_relation) }}; {{ snowflake__get_create_dynamic_table_as_sql(relation, sql) }} {%- endmacro %} {% macro snowflake__refresh_dynamic_table(relation) -%} {{- log('Applying REFRESH to: ' ~ relation) -}} - {{ '' }} + alter dynamic table {{ relation }} set lag = '{{ config.get("lag") }}' {%- endmacro %} {% macro snowflake__get_dynamic_table_configuration_changes(relation, new_config) -%} {{- log('Determining configuration changes on: ' ~ relation) -}} - {%- do return({}) -%} + {%- do return(None) -%} {%- endmacro %} + + +{% macro snowflake__get_drop_dynamic_table_sql(relation) %} + drop dynamic table if exists {{ relation }} +{% endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql b/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql index ac38f7dbe..9d10b92ac 100644 --- a/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql +++ b/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql @@ -1,28 +1,33 @@ -{% materialization dynamic_table, default %} +{% materialization dynamic_table, adapter='snowflake' %} + + {% set original_query_tag = set_query_tag() %} + {% set existing_relation = load_cached_relation(this) %} {% set target_relation = this.incorporate(type=this.DynamicTable) %} {% set intermediate_relation = make_intermediate_relation(target_relation) %} {% set backup_relation_type = target_relation.DynamicTable if existing_relation is none else existing_relation.type %} {% set backup_relation = make_backup_relation(target_relation, backup_relation_type) %} - {{ _setup(backup_relation, intermediate_relation, pre_hooks) }} + {{ dynamic_table_setup(backup_relation, intermediate_relation, pre_hooks) }} - {% set build_sql = _get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} + {% set build_sql = dynamic_table_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} {% if build_sql == '' %} - {{ _execute_no_op(target_relation) }} + {{ dynamic_table_execute_no_op(target_relation) }} {% else %} - {{ _execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }} + {{ dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }} {% endif %} - {{ _teardown(backup_relation, intermediate_relation, post_hooks) }} + {{ dynamic_table_teardown(backup_relation, intermediate_relation, post_hooks) }} + + {% do unset_query_tag(original_query_tag) %} {{ return({'relations': [target_relation]}) }} {% endmaterialization %} -{% macro _setup(backup_relation, intermediate_relation, pre_hooks) %} +{% macro dynamic_table_setup(backup_relation, intermediate_relation, pre_hooks) %} -- backup_relation and intermediate_relation should not already exist in the database -- it's possible these exist because of a previous run that exited unexpectedly @@ -30,26 +35,26 @@ {% set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) %} -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ snowflake__get_drop_dynamic_table_sql(preexisting_backup_relation) }} + {{ snowflake__get_drop_dynamic_table_sql(preexisting_intermediate_relation) }} - {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ run_hooks(pre_hooks) }} {% endmacro %} -{% macro _teardown(backup_relation, intermediate_relation, post_hooks) %} +{% macro dynamic_table_teardown(backup_relation, intermediate_relation, post_hooks) %} -- drop the temp relations if they exist to leave the database clean for the next run - {{ drop_relation_if_exists(backup_relation) }} - {{ drop_relation_if_exists(intermediate_relation) }} + {{ snowflake__get_drop_dynamic_table_sql(backup_relation) }} + {{ snowflake__get_drop_dynamic_table_sql(intermediate_relation) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ run_hooks(post_hooks) }} {% endmacro %} -{% macro _get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} +{% macro dynamic_table_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} {% set full_refresh_mode = should_full_refresh() %} @@ -64,7 +69,7 @@ {% set on_configuration_change = config.get('on_configuration_change') %} {% set configuration_changes = snowflake__get_dynamic_table_configuration_changes(existing_relation, config) %} - {% if configuration_changes == {} %} + {% if configuration_changes is none %} {% set build_sql = snowflake__refresh_dynamic_table(target_relation) %} {% elif on_configuration_change == 'apply' %} @@ -88,7 +93,7 @@ {% endmacro %} -{% macro _execute_no_op(target_relation) %} +{% macro dynamic_table_execute_no_op(target_relation) %} {% do store_raw_result( name="main", message="skip " ~ target_relation, @@ -98,10 +103,7 @@ {% endmacro %} -{% macro _execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} +{% macro dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %} {% set grant_config = config.get('grants') %} @@ -114,8 +116,4 @@ {% do persist_docs(target_relation, model) %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - - {{ adapter.commit() }} - {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index 0b5f8dc26..20cf1e1bc 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/ADAP-2#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/ADAP-2#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter # if version 1.x or greater -> pin to major version # if version 0.x -> pin to minor diff --git a/tests/functional/adapter/dynamic_table_tests/fixtures.py b/tests/functional/adapter/dynamic_table_tests/fixtures.py index 58df45945..e3c147458 100644 --- a/tests/functional/adapter/dynamic_table_tests/fixtures.py +++ b/tests/functional/adapter/dynamic_table_tests/fixtures.py @@ -1,10 +1,41 @@ +from time import sleep +from datetime import datetime + import pytest +from snowflake.connector.errors import ProgrammingError -from dbt.tests.util import relation_from_name +from dbt.dataclass_schema import StrEnum +from dbt.tests.util import relation_from_name, run_sql_with_adapter, get_manifest from dbt.tests.adapter.materialized_view.base import Base from dbt.tests.adapter.materialized_view.on_configuration_change import OnConfigurationChangeBase +def refresh_dynamic_table(adapter, model: str): + sql = f"alter dynamic table {model} set lag = '60 seconds'" + run_sql_with_adapter(adapter, sql) + + +def get_row_count(project, model: str) -> int: + sql = f"select count(*) from {project.database}.{project.test_schema}.{model};" + + now = datetime.now() + while (datetime.now() - now).total_seconds() < 120: + try: + return project.run_sql(sql, fetch="one")[0] + except ProgrammingError: + sleep(5) + raise ProgrammingError("90 seconds has passed and the dynamic table is still not initialized.") + + +def assert_model_exists_and_is_correct_type(project, model: str, relation_type: StrEnum): + # In general, `relation_type` will be of type `RelationType`. + # However, in some cases (e.g. `dbt-snowflake`) adapters will have their own `RelationType`. + manifest = get_manifest(project.project_root) + model_metadata = manifest.nodes[f"model.test.{model}"] + assert model_metadata.config.materialized == relation_type + assert get_row_count(project, model) >= 0 + + class SnowflakeBasicBase(Base): @pytest.fixture(scope="class") def models(self): @@ -13,16 +44,40 @@ def models(self): select 1 as base_column """ base_dynamic_table = """ - {{ config(materialized='dynamic_table') }} + {{ config( + materialized='dynamic_table', + warehouse='DBT_TESTING', + lag='60 seconds', + ) }} select * from {{ ref('base_table') }} """ return {"base_table.sql": base_table, "base_dynamic_table.sql": base_dynamic_table} -class SnowflakeOnConfigurationChangeBase(SnowflakeBasicBase, OnConfigurationChangeBase): +class SnowflakeOnConfigurationChangeBase(OnConfigurationChangeBase): # this avoids rewriting several log message lookups base_materialized_view = "base_dynamic_table" + def refresh_dynamic_table(self, adapter): + sql = "alter dynamic table base_dynamic_table set lag = '60 seconds'" + run_sql_with_adapter(adapter, sql) + + @pytest.fixture(scope="class") + def models(self): + base_table = """ + {{ config(materialized='table') }} + select 1 as base_column + """ + base_dynamic_table = """ + {{ config( + materialized='dynamic_table' + warehouse='DBT_TESTING', + lag='5 minutes', + ) }} + select * from {{ ref('base_table') }} + """ + return {"base_table.sql": base_table, "base_dynamic_table.sql": base_dynamic_table} + @pytest.fixture(scope="function") def configuration_changes(self, project): pass diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_table.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_table.py index 4650546b9..0eabe58a4 100644 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_table.py +++ b/tests/functional/adapter/dynamic_table_tests/test_dynamic_table.py @@ -1,48 +1,57 @@ import pytest from dbt.contracts.results import RunStatus -from dbt.adapters.snowflake.relation import SnowflakeRelationType +from dbt.contracts.graph.model_config import OnConfigurationChangeOption from dbt.tests.adapter.materialized_view.base import ( run_model, - assert_model_exists_and_is_correct_type, insert_record, - get_row_count, ) from dbt.tests.adapter.materialized_view.on_configuration_change import assert_proper_scenario +from dbt.adapters.snowflake.relation import SnowflakeRelationType from tests.functional.adapter.dynamic_table_tests.fixtures import ( SnowflakeBasicBase, SnowflakeOnConfigurationChangeBase, + assert_model_exists_and_is_correct_type, + refresh_dynamic_table, + get_row_count, ) class TestBasic(SnowflakeBasicBase): - def test_relation_is_dynamic_table_on_initial_creation(self, project): + def test_relation_is_dynamic_table_on_initial_creation(self, project, adapter): + refresh_dynamic_table(adapter, "base_dynamic_table") assert_model_exists_and_is_correct_type( project, "base_dynamic_table", SnowflakeRelationType.DynamicTable ) assert_model_exists_and_is_correct_type(project, "base_table", SnowflakeRelationType.Table) - def test_relation_is_dynamic_table_when_rerun(self, project): + def test_relation_is_dynamic_table_when_rerun(self, project, adapter): run_model("base_dynamic_table") + refresh_dynamic_table(adapter, "base_dynamic_table") assert_model_exists_and_is_correct_type( project, "base_dynamic_table", SnowflakeRelationType.DynamicTable ) - def test_relation_is_dynamic_table_on_full_refresh(self, project): + def test_relation_is_dynamic_table_on_full_refresh(self, project, adapter): run_model("base_dynamic_table", full_refresh=True) + refresh_dynamic_table(adapter, "base_dynamic_table") assert_model_exists_and_is_correct_type( project, "base_dynamic_table", SnowflakeRelationType.DynamicTable ) - def test_relation_is_dynamic_table_on_update(self, project): + def test_relation_is_dynamic_table_on_update(self, project, adapter): run_model("base_dynamic_table", run_args=["--vars", "quoting: {identifier: True}"]) + refresh_dynamic_table(adapter, "base_dynamic_table") assert_model_exists_and_is_correct_type( project, "base_dynamic_table", SnowflakeRelationType.DynamicTable ) - @pytest.mark.skip("Fails because stub uses traditional view") - def test_updated_base_table_data_only_shows_in_dynamic_table_after_rerun(self, project): + def test_updated_base_table_data_only_shows_in_dynamic_table_after_rerun( + self, project, adapter + ): + refresh_dynamic_table(adapter, "base_dynamic_table") + # poll database table_start = get_row_count(project, "base_table") dyn_start = get_row_count(project, "base_dynamic_table") @@ -67,13 +76,18 @@ def test_updated_base_table_data_only_shows_in_dynamic_table_after_rerun(self, p assert dyn_start == dyn_mid < dyn_end -class OnConfigurationChangeCommon(SnowflakeOnConfigurationChangeBase): +@pytest.mark.skip("We're not looking for changes yet") +class TestOnConfigurationChangeApply(SnowflakeOnConfigurationChangeBase): + # we don't need to specify OnConfigurationChangeOption.Apply because it's the default + # this is part of the test + def test_full_refresh_takes_precedence_over_any_configuration_changes( - self, configuration_changes, replace_message, configuration_change_message + self, configuration_changes, replace_message, configuration_change_message, adapter ): + refresh_dynamic_table(adapter, "base_dynamic_table") results, logs = run_model("base_dynamic_table", full_refresh=True) assert_proper_scenario( - self.on_configuration_change, + OnConfigurationChangeOption.Apply, results, logs, RunStatus.Success, @@ -82,26 +96,25 @@ def test_full_refresh_takes_precedence_over_any_configuration_changes( ) def test_model_is_refreshed_with_no_configuration_changes( - self, refresh_message, configuration_change_message + self, refresh_message, configuration_change_message, adapter ): + refresh_dynamic_table(adapter, "base_dynamic_table") results, logs = run_model("base_dynamic_table") assert_proper_scenario( - self.on_configuration_change, + OnConfigurationChangeOption.Apply, results, logs, RunStatus.Success, messages_in_logs=[refresh_message, configuration_change_message], ) - -class TestOnConfigurationChangeApply(OnConfigurationChangeCommon): - @pytest.mark.skip("This fails because there are no changes in the stub") def test_model_applies_changes_with_configuration_changes( - self, configuration_changes, alter_message, update_index_message + self, configuration_changes, alter_message, update_index_message, adapter ): + refresh_dynamic_table(adapter, "base_dynamic_table") results, logs = run_model("base_dynamic_table") assert_proper_scenario( - self.on_configuration_change, + OnConfigurationChangeOption.Apply, results, logs, RunStatus.Success, @@ -109,16 +122,46 @@ def test_model_applies_changes_with_configuration_changes( ) -class TestOnConfigurationChangeSkip(OnConfigurationChangeCommon): - on_configuration_change = "skip" +@pytest.mark.skip("We're not looking for changes yet") +class TestOnConfigurationChangeContinue(SnowflakeOnConfigurationChangeBase): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}} + + def test_full_refresh_takes_precedence_over_any_configuration_changes( + self, configuration_changes, replace_message, configuration_change_message, adapter + ): + refresh_dynamic_table(adapter, "base_dynamic_table") + results, logs = run_model("base_dynamic_table", full_refresh=True) + assert_proper_scenario( + OnConfigurationChangeOption.Continue, + results, + logs, + RunStatus.Success, + messages_in_logs=[replace_message], + messages_not_in_logs=[configuration_change_message], + ) + + def test_model_is_refreshed_with_no_configuration_changes( + self, refresh_message, configuration_change_message, adapter + ): + refresh_dynamic_table(adapter, "base_dynamic_table") + results, logs = run_model("base_dynamic_table") + assert_proper_scenario( + OnConfigurationChangeOption.Continue, + results, + logs, + RunStatus.Success, + messages_in_logs=[refresh_message, configuration_change_message], + ) - @pytest.mark.skip("This fails because there are no changes in the stub") def test_model_is_skipped_with_configuration_changes( - self, configuration_changes, configuration_change_skip_message + self, configuration_changes, configuration_change_skip_message, adapter ): + refresh_dynamic_table(adapter, "base_dynamic_table") results, logs = run_model("base_dynamic_table") assert_proper_scenario( - self.on_configuration_change, + OnConfigurationChangeOption.Continue, results, logs, RunStatus.Success, @@ -126,16 +169,46 @@ def test_model_is_skipped_with_configuration_changes( ) -class TestOnConfigurationChangeFail(OnConfigurationChangeCommon): - on_configuration_change = "fail" +@pytest.mark.skip("We're not looking for changes yet") +class TestOnConfigurationChangeFail(SnowflakeOnConfigurationChangeBase): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}} + + def test_full_refresh_takes_precedence_over_any_configuration_changes( + self, configuration_changes, replace_message, configuration_change_message, adapter + ): + refresh_dynamic_table(adapter, "base_dynamic_table") + results, logs = run_model("base_dynamic_table", full_refresh=True) + assert_proper_scenario( + OnConfigurationChangeOption.Fail, + results, + logs, + RunStatus.Success, + messages_in_logs=[replace_message], + messages_not_in_logs=[configuration_change_message], + ) + + def test_model_is_refreshed_with_no_configuration_changes( + self, refresh_message, configuration_change_message, adapter + ): + refresh_dynamic_table(adapter, "base_dynamic_table") + results, logs = run_model("base_dynamic_table") + assert_proper_scenario( + OnConfigurationChangeOption.Fail, + results, + logs, + RunStatus.Success, + messages_in_logs=[refresh_message, configuration_change_message], + ) - @pytest.mark.skip("This fails because there are no changes in the stub") def test_run_fails_with_configuration_changes( - self, configuration_changes, configuration_change_fail_message + self, configuration_changes, configuration_change_fail_message, adapter ): + refresh_dynamic_table(adapter, "base_dynamic_table") results, logs = run_model("base_dynamic_table", expect_pass=False) assert_proper_scenario( - self.on_configuration_change, + OnConfigurationChangeOption.Fail, results, logs, RunStatus.Error,