Skip to content

Commit

Permalink
ADAP-542: Add configuration options for dynamic tables (#636)
Browse files Browse the repository at this point in the history
* add new config enums

* added refresh strategy query and relation method to determine differences

* added dynamic table ddl

* tests mostly pass, fail due to dynamic table being unavailable

* updated with materialized views pushed to main, added retry to get_row_count for dynamic table initialization
  • Loading branch information
mikealfare authored Jun 8, 2023
1 parent 02c947f commit 9111c25
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 64 deletions.
Empty file.
26 changes: 26 additions & 0 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dataclasses import dataclass

from dbt.adapters.relation_configs import RelationConfigBase

from dbt.adapters.snowflake.relation_configs.lag import SnowflakeDynamicTableLagConfig


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(RelationConfigBase):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/LIMITEDACCESS/create-dynamic-table
The following parameters are configurable by dbt:
- name: name of the dynamic table
- query: the query behind the table
- lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
There are currently no non-configurable parameters.
"""

name: str
query: str
lag: SnowflakeDynamicTableLagConfig
warehouse: str
28 changes: 28 additions & 0 deletions dbt/adapters/snowflake/relation_configs/lag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dataclasses import dataclass

from dbt.adapters.relation_configs import RelationConfigBase
from dbt.dataclass_schema import StrEnum


class SnowflakeDynamicTableLagPeriod(StrEnum):
seconds = "seconds"
minutes = "minutes"
hours = "hours"
days = "days"


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableLagConfig(RelationConfigBase):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/LIMITEDACCESS/create-dynamic-table
The following parameters are configurable by dbt:
- duration: the numeric part of the lag
- period: the scale part of the lag
There are currently no non-configurable parameters.
"""

duration: int
period: SnowflakeDynamicTableLagPeriod
2 changes: 1 addition & 1 deletion dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@
{% macro snowflake__drop_relation(relation) -%}
{%- if relation.is_dynamic_table -%}
{% call statement('drop_relation', auto_begin=False) -%}
{{ drop_view(relation) }}
drop dynamic table if exists {{ relation }}
{%- endcall %}
{%- else -%}
{{- default__drop_relation(relation) -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,34 @@

{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}
{{- log('Applying CREATE to: ' ~ relation) -}}
{{- get_create_view_as_sql(relation, sql) -}}

create or replace dynamic table {{ relation }}
lag = '{{ config.get("lag") }}'
warehouse = {{ config.get("warehouse") }}
as ({{ sql }})

{%- endmacro %}


{% macro snowflake__get_replace_dynamic_table_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) -%}
{{- log('Applying REPLACE to: ' ~ relation) -}}
{{ drop_relation(existing_relation) }}
{{ snowflake__get_drop_dynamic_table_sql(existing_relation) }};
{{ snowflake__get_create_dynamic_table_as_sql(relation, sql) }}
{%- endmacro %}


{% macro snowflake__refresh_dynamic_table(relation) -%}
{{- log('Applying REFRESH to: ' ~ relation) -}}
{{ '' }}
alter dynamic table {{ relation }} set lag = '{{ config.get("lag") }}'
{%- endmacro %}


{% macro snowflake__get_dynamic_table_configuration_changes(relation, new_config) -%}
{{- log('Determining configuration changes on: ' ~ relation) -}}
{%- do return({}) -%}
{%- do return(None) -%}
{%- endmacro %}


{% macro snowflake__get_drop_dynamic_table_sql(relation) %}
drop dynamic table if exists {{ relation }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,55 +1,60 @@
{% materialization dynamic_table, default %}
{% materialization dynamic_table, adapter='snowflake' %}

{% set original_query_tag = set_query_tag() %}

{% set existing_relation = load_cached_relation(this) %}
{% set target_relation = this.incorporate(type=this.DynamicTable) %}
{% set intermediate_relation = make_intermediate_relation(target_relation) %}
{% set backup_relation_type = target_relation.DynamicTable if existing_relation is none else existing_relation.type %}
{% set backup_relation = make_backup_relation(target_relation, backup_relation_type) %}

{{ _setup(backup_relation, intermediate_relation, pre_hooks) }}
{{ dynamic_table_setup(backup_relation, intermediate_relation, pre_hooks) }}

{% set build_sql = _get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}
{% set build_sql = dynamic_table_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}

{% if build_sql == '' %}
{{ _execute_no_op(target_relation) }}
{{ dynamic_table_execute_no_op(target_relation) }}
{% else %}
{{ _execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }}
{{ dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }}
{% endif %}

{{ _teardown(backup_relation, intermediate_relation, post_hooks) }}
{{ dynamic_table_teardown(backup_relation, intermediate_relation, post_hooks) }}

{% do unset_query_tag(original_query_tag) %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}


{% macro _setup(backup_relation, intermediate_relation, pre_hooks) %}
{% macro dynamic_table_setup(backup_relation, intermediate_relation, pre_hooks) %}

-- backup_relation and intermediate_relation should not already exist in the database
-- it's possible these exist because of a previous run that exited unexpectedly
{% set preexisting_backup_relation = load_cached_relation(backup_relation) %}
{% set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) %}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ snowflake__get_drop_dynamic_table_sql(preexisting_backup_relation) }}
{{ snowflake__get_drop_dynamic_table_sql(preexisting_intermediate_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks) }}

{% endmacro %}


{% macro _teardown(backup_relation, intermediate_relation, post_hooks) %}
{% macro dynamic_table_teardown(backup_relation, intermediate_relation, post_hooks) %}

-- drop the temp relations if they exist to leave the database clean for the next run
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation_if_exists(intermediate_relation) }}
{{ snowflake__get_drop_dynamic_table_sql(backup_relation) }}
{{ snowflake__get_drop_dynamic_table_sql(intermediate_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks) }}

{% endmacro %}


{% macro _get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}
{% macro dynamic_table_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}

{% set full_refresh_mode = should_full_refresh() %}

Expand All @@ -64,7 +69,7 @@
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = snowflake__get_dynamic_table_configuration_changes(existing_relation, config) %}

{% if configuration_changes == {} %}
{% if configuration_changes is none %}
{% set build_sql = snowflake__refresh_dynamic_table(target_relation) %}

{% elif on_configuration_change == 'apply' %}
Expand All @@ -88,7 +93,7 @@
{% endmacro %}


{% macro _execute_no_op(target_relation) %}
{% macro dynamic_table_execute_no_op(target_relation) %}
{% do store_raw_result(
name="main",
message="skip " ~ target_relation,
Expand All @@ -98,10 +103,7 @@
{% endmacro %}


{% macro _execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% macro dynamic_table_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %}

{% set grant_config = config.get('grants') %}

Expand All @@ -114,8 +116,4 @@

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% endmacro %}
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/ADAP-2#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@feature/materialized-views/ADAP-2#egg=dbt-tests-adapter&subdirectory=tests/adapter
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter

# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
Expand Down
61 changes: 58 additions & 3 deletions tests/functional/adapter/dynamic_table_tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,41 @@
from time import sleep
from datetime import datetime

import pytest
from snowflake.connector.errors import ProgrammingError

from dbt.tests.util import relation_from_name
from dbt.dataclass_schema import StrEnum
from dbt.tests.util import relation_from_name, run_sql_with_adapter, get_manifest
from dbt.tests.adapter.materialized_view.base import Base
from dbt.tests.adapter.materialized_view.on_configuration_change import OnConfigurationChangeBase


def refresh_dynamic_table(adapter, model: str):
sql = f"alter dynamic table {model} set lag = '60 seconds'"
run_sql_with_adapter(adapter, sql)


def get_row_count(project, model: str) -> int:
sql = f"select count(*) from {project.database}.{project.test_schema}.{model};"

now = datetime.now()
while (datetime.now() - now).total_seconds() < 120:
try:
return project.run_sql(sql, fetch="one")[0]
except ProgrammingError:
sleep(5)
raise ProgrammingError("90 seconds has passed and the dynamic table is still not initialized.")


def assert_model_exists_and_is_correct_type(project, model: str, relation_type: StrEnum):
# In general, `relation_type` will be of type `RelationType`.
# However, in some cases (e.g. `dbt-snowflake`) adapters will have their own `RelationType`.
manifest = get_manifest(project.project_root)
model_metadata = manifest.nodes[f"model.test.{model}"]
assert model_metadata.config.materialized == relation_type
assert get_row_count(project, model) >= 0


class SnowflakeBasicBase(Base):
@pytest.fixture(scope="class")
def models(self):
Expand All @@ -13,16 +44,40 @@ def models(self):
select 1 as base_column
"""
base_dynamic_table = """
{{ config(materialized='dynamic_table') }}
{{ config(
materialized='dynamic_table',
warehouse='DBT_TESTING',
lag='60 seconds',
) }}
select * from {{ ref('base_table') }}
"""
return {"base_table.sql": base_table, "base_dynamic_table.sql": base_dynamic_table}


class SnowflakeOnConfigurationChangeBase(SnowflakeBasicBase, OnConfigurationChangeBase):
class SnowflakeOnConfigurationChangeBase(OnConfigurationChangeBase):
# this avoids rewriting several log message lookups
base_materialized_view = "base_dynamic_table"

def refresh_dynamic_table(self, adapter):
sql = "alter dynamic table base_dynamic_table set lag = '60 seconds'"
run_sql_with_adapter(adapter, sql)

@pytest.fixture(scope="class")
def models(self):
base_table = """
{{ config(materialized='table') }}
select 1 as base_column
"""
base_dynamic_table = """
{{ config(
materialized='dynamic_table'
warehouse='DBT_TESTING',
lag='5 minutes',
) }}
select * from {{ ref('base_table') }}
"""
return {"base_table.sql": base_table, "base_dynamic_table.sql": base_dynamic_table}

@pytest.fixture(scope="function")
def configuration_changes(self, project):
pass
Expand Down
Loading

0 comments on commit 9111c25

Please sign in to comment.