diff --git a/.changes/unreleased/Features-20231014-155246.yaml b/.changes/unreleased/Features-20231014-155246.yaml new file mode 100644 index 000000000..ef210d9e3 --- /dev/null +++ b/.changes/unreleased/Features-20231014-155246.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Optimize refreshing dynamic tables when autorefreshed +time: 2023-10-14T15:52:46.484-04:00 +custom: + Author: mikealfare + Issue: "806" diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 9d6182a71..42ec3c742 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -48,7 +48,7 @@ def dynamic_table_config_changeset( existing_dynamic_table = SnowflakeDynamicTableConfig.from_relation_results( relation_results ) - new_dynamic_table = SnowflakeDynamicTableConfig.from_model_node(runtime_config.model) + new_dynamic_table = SnowflakeDynamicTableConfig.from_node(runtime_config.model) config_change_collection = SnowflakeDynamicTableConfigChangeset() diff --git a/dbt/adapters/snowflake/relation_configs/base.py b/dbt/adapters/snowflake/relation_configs/base.py index d7f9f121b..2d8b1a5f2 100644 --- a/dbt/adapters/snowflake/relation_configs/base.py +++ b/dbt/adapters/snowflake/relation_configs/base.py @@ -1,13 +1,9 @@ from dataclasses import dataclass -from typing import Any, Dict, Optional +from typing import Optional import agate from dbt.adapters.base.relation import Policy -from dbt.adapters.relation_configs import ( - RelationConfigBase, - RelationResults, -) -from dbt.contracts.graph.nodes import ModelNode +from dbt.adapters.relation_configs import RelationConfigBase from dbt.contracts.relation import ComponentName from dbt.adapters.snowflake.relation_configs.policies import ( @@ -30,30 +26,6 @@ def include_policy(cls) -> Policy: def quote_policy(cls) -> Policy: return SnowflakeQuotePolicy() - @classmethod - def from_model_node(cls, model_node: ModelNode): - relation_config = cls.parse_model_node(model_node) - relation = cls.from_dict(relation_config) - return relation - - @classmethod - def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: - raise NotImplementedError( - "`parse_model_node()` needs to be implemented on this RelationConfigBase instance" - ) - - @classmethod - def from_relation_results(cls, relation_results: RelationResults): - relation_config = cls.parse_relation_results(relation_results) - relation = cls.from_dict(relation_config) - return relation - - @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: - raise NotImplementedError( - "`parse_relation_results()` needs to be implemented on this RelationConfigBase instance" - ) - @classmethod def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]: if cls.include_policy().get_part(component) and value: diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 6caa7211e..208439ef7 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -1,9 +1,9 @@ from dataclasses import dataclass -from typing import Optional +from typing import Any, Dict, Optional import agate from dbt.adapters.relation_configs import RelationConfigChange, RelationResults -from dbt.contracts.graph.nodes import ModelNode +from dbt.contracts.graph.nodes import ParsedNode from dbt.contracts.relation import ComponentName from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase @@ -17,7 +17,6 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): The following parameters are configurable by dbt: - name: name of the dynamic table - - query: the query behind the table - target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables - snowflake_warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table @@ -27,19 +26,17 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): name: str schema_name: str database_name: str - query: str target_lag: str snowflake_warehouse: str @classmethod - def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": + def from_dict(cls, config_dict: Dict[str, Any]) -> "SnowflakeDynamicTableConfig": kwargs_dict = { "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), "database_name": cls._render_part( ComponentName.Database, config_dict.get("database_name") ), - "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), } @@ -48,27 +45,25 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": return dynamic_table @classmethod - def parse_model_node(cls, model_node: ModelNode) -> dict: + def parse_node(cls, node: ParsedNode) -> Dict[str, Any]: config_dict = { - "name": model_node.identifier, - "schema_name": model_node.schema, - "database_name": model_node.database, - "query": model_node.compiled_code, - "target_lag": model_node.config.extra.get("target_lag"), - "snowflake_warehouse": model_node.config.extra.get("snowflake_warehouse"), + "name": node.identifier, + "schema_name": node.schema, + "database_name": node.database, + "target_lag": node.config.extra.get("target_lag"), + "snowflake_warehouse": node.config.extra.get("snowflake_warehouse"), } return config_dict @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> dict: + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: dynamic_table: agate.Row = relation_results["dynamic_table"].rows[0] config_dict = { "name": dynamic_table.get("name"), "schema_name": dynamic_table.get("schema_name"), "database_name": dynamic_table.get("database_name"), - "query": dynamic_table.get("text"), "target_lag": dynamic_table.get("target_lag"), "snowflake_warehouse": dynamic_table.get("warehouse"), } diff --git a/dev-requirements.txt b/dev-requirements.txt index c696a773c..6625bb1cd 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/adap-835#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/adap-835#egg=dbt-tests-adapter&subdirectory=tests/adapter # if version 1.x or greater -> pin to major version # if version 0.x -> pin to minor diff --git a/tests/functional/adapter/dynamic_table_tests/files.py b/tests/functional/adapter/dynamic_table_tests/files.py index 6b449d476..de8803079 100644 --- a/tests/functional/adapter/dynamic_table_tests/files.py +++ b/tests/functional/adapter/dynamic_table_tests/files.py @@ -30,3 +30,16 @@ ) }} select * from {{ ref('my_seed') }} """ + + +MACRO__LAST_REFRESH = """ +{% macro snowflake__test__last_refresh(schema, identifier) %} + {% set _sql %} + select max(refresh_start_time) as last_refresh + from table(information_schema.dynamic_table_refresh_history()) + where schema_name = '{{ schema }}' + and name = '{{ identifier }}' + {% endset %} + {{ return(run_query(_sql)) }} +{% endmacro %} +""" diff --git a/tests/functional/adapter/dynamic_table_tests/test_auto_refresh.py b/tests/functional/adapter/dynamic_table_tests/test_auto_refresh.py new file mode 100644 index 000000000..7cb58a771 --- /dev/null +++ b/tests/functional/adapter/dynamic_table_tests/test_auto_refresh.py @@ -0,0 +1,39 @@ +from datetime import datetime + +import pytest + +# it's the same test for DTs as for MVs in other adapters +from dbt.tests.adapter.materialized_view.auto_refresh import ( + MaterializedViewAutoRefreshNoChanges, +) + +from tests.functional.adapter.dynamic_table_tests import files + + +class TestDynamicTableAutoRefreshNoChanges(MaterializedViewAutoRefreshNoChanges): + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": files.MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "auto_refresh_on.sql": files.MY_DYNAMIC_TABLE, + } + + @pytest.fixture(scope="class", autouse=True) + def macros(self): + yield {"snowflake__test__last_refresh.sql": files.MACRO__LAST_REFRESH} + + def last_refreshed(self, project, dynamic_table: str) -> datetime: + with project.adapter.connection_named("__test"): + kwargs = {"schema": project.test_schema, "identifier": dynamic_table} + last_refresh_results = project.adapter.execute_macro( + "snowflake__test__last_refresh", kwargs=kwargs + ) + last_refresh = last_refresh_results[0].get("last_refresh") + return last_refresh + + @pytest.mark.skip("Snowflake does not support turning off auto refresh.") + def test_manual_refresh_occurs_when_auto_refresh_is_off(self, project): + pass