Skip to content

Commit

Permalink
Dynamic Table testing improvements (#1187)
Browse files Browse the repository at this point in the history
* move dynamic table tests down into the relation tests folder
* make utils more generic, move out of dynamic table tests
* add init files for namespacing in test discovery
* remove 2024_03 bundle items
mikealfare authored Sep 20, 2024
1 parent 084674f commit 34c4442
Showing 12 changed files with 341 additions and 575 deletions.
35 changes: 11 additions & 24 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
@@ -258,28 +258,20 @@ def list_relations_without_caching(
return []
raise

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in schema_objects.column_names:
columns.append("is_dynamic")
if "is_iceberg" in schema_objects.column_names:
# this can be collapsed once Snowflake adds is_iceberg to show objects
columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"]
if self.behavior.enable_iceberg_materializations.no_warn:
columns.append("is_iceberg")

return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
# this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects
try:
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
if self.behavior.enable_iceberg_materializations.no_warn:
is_iceberg = "N"
# this can be collapsed once Snowflake adds is_iceberg to show objects
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
@@ -289,13 +281,8 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

# This line is the main gate on supporting Iceberg materializations. Pass forward a default
# table format, and no downstream table macros can build iceberg relations.
table_format: str = (
TableFormat.ICEBERG
if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES")
else TableFormat.DEFAULT
)
table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT

quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(

This file was deleted.

This file was deleted.

53 changes: 0 additions & 53 deletions tests/functional/adapter/dynamic_table_tests/utils.py

This file was deleted.

Empty file.
Empty file.
50 changes: 50 additions & 0 deletions tests/functional/relation_tests/dynamic_table_tests/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
SEED = """
id,value
1,100
2,200
3,300
""".strip()


DYNAMIC_TABLE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_DOWNSTREAM = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='DOWNSTREAM',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_ALTER = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='5 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE_REPLACE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='FULL',
) }}
select * from {{ ref('my_seed') }}
"""
30 changes: 30 additions & 0 deletions tests/functional/relation_tests/dynamic_table_tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

from dbt.tests.util import run_dbt

from tests.functional.relation_tests.dynamic_table_tests import models
from tests.functional.utils import query_relation_type


class TestBasic:

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
"my_dynamic_table.sql": models.DYNAMIC_TABLE,
"my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM,
}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])

def test_dynamic_table_full_refresh(self, project):
run_dbt(["run", "--full-refresh"])
assert query_relation_type(project, "my_dynamic_table") == "dynamic_table"
assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table"
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pytest

from dbt.tests.util import run_dbt

from tests.functional.relation_tests.dynamic_table_tests import models
from tests.functional.utils import describe_dynamic_table, update_model


class Changes:

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
yield {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
"dynamic_table_alter.sql": models.DYNAMIC_TABLE,
"dynamic_table_replace.sql": models.DYNAMIC_TABLE,
}

@pytest.fixture(scope="function", autouse=True)
def setup_class(self, project):
run_dbt(["seed"])
yield
project.run_sql(f"drop schema if exists {project.test_schema} cascade")

@pytest.fixture(scope="function", autouse=True)
def setup_method(self, project, setup_class):
# make sure the model in the data reflects the files each time
run_dbt(["run", "--full-refresh"])
self.assert_changes_are_not_applied(project)

update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE)

yield

update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE)

@staticmethod
def assert_changes_are_applied(project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "5 minutes" # this updated
assert altered.refresh_mode == "INCREMENTAL"

replaced = describe_dynamic_table(project, "dynamic_table_replace")
assert replaced.snowflake_warehouse == "DBT_TESTING"
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "FULL" # this updated

@staticmethod
def assert_changes_are_not_applied(project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "2 minutes" # this would have updated, but didn't
assert altered.refresh_mode == "INCREMENTAL"

replaced = describe_dynamic_table(project, "dynamic_table_replace")
assert replaced.snowflake_warehouse == "DBT_TESTING"
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't

def test_full_refresh_is_always_successful(self, project):
# this always passes and always changes the configuration, regardless of on_configuration_change
# and regardless of whether the changes require a replace versus an alter
run_dbt(["run", "--full-refresh"])
self.assert_changes_are_applied(project)


class TestChangesApply(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"on_configuration_change": "apply"}}

def test_changes_are_applied(self, project):
# this passes and changes the configuration
run_dbt(["run"])
self.assert_changes_are_applied(project)


class TestChangesContinue(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"on_configuration_change": "continue"}}

def test_changes_are_not_applied(self, project):
# this passes but does not change the configuration
run_dbt(["run"])
self.assert_changes_are_not_applied(project)


class TestChangesFail(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"on_configuration_change": "fail"}}

def test_changes_are_not_applied(self, project):
# this fails and does not change the configuration
run_dbt(["run"], expect_pass=False)
self.assert_changes_are_not_applied(project)
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
MY_SEED = """
SEED = """
id,value
1,100
2,200
3,300
""".strip()


MY_TABLE = """
TABLE = """
{{ config(
materialized='table',
) }}
select * from {{ ref('my_seed') }}
"""


MY_VIEW = """
VIEW = """
{{ config(
materialized='view',
) }}
select * from {{ ref('my_seed') }}
"""


MY_DYNAMIC_TABLE = """
DYNAMIC_TABLE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
target_lag='1 minute',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
64 changes: 64 additions & 0 deletions tests/functional/relation_tests/test_relation_type_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from dataclasses import dataclass
from itertools import product

from dbt.tests.util import run_dbt
import pytest

from tests.functional.relation_tests import models
from tests.functional.utils import query_relation_type, update_model


@dataclass
class Model:
model: str
relation_type: str

@property
def name(self) -> str:
return f"{self.relation_type}"


@dataclass
class Scenario:
initial: Model
final: Model

@property
def name(self) -> str:
return f"REPLACE_{self.initial.name}__WITH_{self.final.name}"

@property
def error_message(self) -> str:
return f"Failed when migrating from: {self.initial.name} to: {self.final.name}"


relations = [
Model(models.VIEW, "view"),
Model(models.TABLE, "table"),
Model(models.DYNAMIC_TABLE, "dynamic_table"),
]
scenarios = [Scenario(*scenario) for scenario in product(relations, relations)]


class TestRelationTypeChange:

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])
for scenario in scenarios:
update_model(project, scenario.name, scenario.final.model)
run_dbt(["run"])

@pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios])
def test_replace(self, project, scenario):
relation_type = query_relation_type(project, scenario.name)
assert relation_type == scenario.final.relation_type, scenario.error_message
78 changes: 78 additions & 0 deletions tests/functional/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Any, Dict, Optional

from dbt.tests.util import (
get_connection,
get_model_file,
relation_from_name,
set_model_file,
)

from dbt.adapters.snowflake.relation_configs import SnowflakeDynamicTableConfig


def query_relation_type(project, name: str) -> Optional[str]:
relation = relation_from_name(project.adapter, name)
sql = f"""
select
case table_type
when 'BASE TABLE' then iff(is_dynamic = 'YES', 'dynamic_table', 'table')
when 'VIEW' then 'view'
when 'EXTERNAL TABLE' then 'external_table'
end as relation_type
from information_schema.tables
where table_name like '{relation.identifier.upper()}'
and table_schema like '{relation.schema.upper()}'
and table_catalog like '{relation.database.upper()}'
"""
results = project.run_sql(sql, fetch="all")

assert len(results) > 0, f"Relation {relation} not found"
assert len(results) == 1, f"Multiple relations found"

return results[0][0].lower()


def query_row_count(project, name: str) -> int:
relation = relation_from_name(project.adapter, name)
sql = f"select count(*) from {relation}"
return project.run_sql(sql, fetch="one")[0]


def insert_record(project, name: str, record: Dict[str, Any]):
relation = relation_from_name(project.adapter, name)
column_names = ", ".join(record.keys())
values = ", ".join(
[f"'{value}'" if isinstance(value, str) else f"{value}" for value in record.values()]
)
sql = f"insert into {relation} ({column_names}) values ({values})"
project.run_sql(sql)


def update_model(project, name: str, model: str) -> str:
relation = relation_from_name(project.adapter, name)
original_model = get_model_file(project, relation)
set_model_file(project, relation, model)
return original_model


def describe_dynamic_table(project, name: str) -> Optional[SnowflakeDynamicTableConfig]:
macro = "snowflake__describe_dynamic_table"
dynamic_table = relation_from_name(project.adapter, name)
kwargs = {"relation": dynamic_table}
with get_connection(project.adapter):
results = project.adapter.execute_macro(macro, kwargs=kwargs)

assert len(results["dynamic_table"].rows) > 0, f"Dynamic table {dynamic_table} not found"
found = len(results["dynamic_table"].rows)
names = ", ".join([table.get("name") for table in results["dynamic_table"].rows])
assert found == 1, f"Multiple dynamic tables found: {names}"

return SnowflakeDynamicTableConfig.from_relation_results(results)


def refresh_dynamic_table(project, name: str) -> None:
macro = "snowflake__refresh_dynamic_table"
dynamic_table = relation_from_name(project.adapter, name)
kwargs = {"relation": dynamic_table}
with get_connection(project.adapter):
project.adapter.execute_macro(macro, kwargs=kwargs)

0 comments on commit 34c4442

Please sign in to comment.