Skip to content

Commit

Permalink
ADAP-728: Fix "On Configuration Change" config (#708)
Browse files Browse the repository at this point in the history
* add alter scenario for dynamic tables
* remove pytest skip statements on change tests
* update target lag tests
* fix target_lag in create dynamic table statement
* fixed name changes during model production
* clean up dynamic table changeset
* update target lag input options

* changie
* remove unnecessary asserts
* skip F401 on raising objects in __init__.py

(cherry picked from commit 02935f7)
  • Loading branch information
mikealfare authored and github-actions[bot] committed Jul 28, 2023
1 parent e99842f commit a151224
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 106 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230726-164729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix `on_configuration_change` setting to properly recognize `continue` and `fail`
time: 2023-07-26T16:47:29.471437-04:00
custom:
Author: mikealfare
Issue: "708"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ ignore =
E741,
E501,
exclude = test
per-file-ignores =
*/__init__.py: F401
60 changes: 43 additions & 17 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Type

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.dataclass_schema import StrEnum
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import RelationConfigChangeAction, RelationResults
from dbt.context.providers import RuntimeConfigObject
from dbt.utils import classproperty


class SnowflakeRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"


@dataclass
class SnowflakeQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)


@dataclass(frozen=True, eq=False, repr=False)
Expand All @@ -33,3 +28,34 @@ def is_dynamic_table(self) -> bool:
@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)

@classproperty
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType

@classmethod
def dynamic_table_config_changeset(
cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject
) -> Optional[SnowflakeDynamicTableConfigChangeset]:
existing_dynamic_table = SnowflakeDynamicTableConfig.from_relation_results(
relation_results
)
new_dynamic_table = SnowflakeDynamicTableConfig.from_model_node(runtime_config.model)

config_change_collection = SnowflakeDynamicTableConfigChangeset()

if new_dynamic_table.target_lag != existing_dynamic_table.target_lag:
config_change_collection.target_lag = SnowflakeDynamicTableTargetLagConfigChange(
action=RelationConfigChangeAction.alter,
context=new_dynamic_table.target_lag,
)

if new_dynamic_table.warehouse != existing_dynamic_table.warehouse:
config_change_collection.warehouse = SnowflakeDynamicTableWarehouseConfigChange(
action=RelationConfigChangeAction.alter,
context=new_dynamic_table.warehouse,
)

if config_change_collection.has_changes:
return config_change_collection
return None
12 changes: 10 additions & 2 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from dbt.adapters.snowflake.relation_configs.dynamic_table import ( # noqa: F401
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableWarehouseConfigChange,
)
from dbt.adapters.snowflake.relation_configs.target_lag import ( # noqa: F401
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableTargetLagPeriod,
)
70 changes: 70 additions & 0 deletions dbt/adapters/snowflake/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import agate
from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationResults,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName

from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeRelationConfigBase(RelationConfigBase):
"""
This base class implements a few boilerplate methods and provides some light structure for Snowflake relations.
"""

@classmethod
def include_policy(cls) -> Policy:
return SnowflakeIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return SnowflakeQuotePolicy()

@classmethod
def from_model_node(cls, model_node: ModelNode):
relation_config = cls.parse_model_node(model_node)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_model_node()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_relation_results(cls, relation_results: RelationResults):
relation_config = cls.parse_relation_results(relation_results)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_relation_results()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
if cls.include_policy().get_part(component) and value:
if cls.quote_policy().get_part(component):
return f'"{value}"'
return value.lower()
return None

@classmethod
def _get_first_row(cls, results: agate.Table) -> agate.Row:
try:
return results.rows[0]
except IndexError:
return agate.Row(values=set())
108 changes: 103 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,124 @@
from dataclasses import dataclass
from typing import Optional

from dbt.adapters.relation_configs import RelationConfigBase
import agate
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName

from dbt.adapters.snowflake.relation_configs.target_lag import SnowflakeDynamicTableTargetLagConfig
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(RelationConfigBase):
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
This config follow the specs found here:
TODO: add URL once it's GA
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table
The following parameters are configurable by dbt:
- name: name of the dynamic table
- query: the query behind the table
- lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
There are currently no non-configurable parameters.
"""

name: str
schema_name: str
database_name: str
query: str
target_lag: SnowflakeDynamicTableTargetLagConfig
warehouse: str

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
"database_name": cls._render_part(
ComponentName.Database, config_dict.get("database_name")
),
"query": config_dict.get("query"),
"warehouse": config_dict.get("warehouse"),
}

if target_lag := config_dict.get("target_lag"):
kwargs_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.from_dict(target_lag)}
)

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) # type: ignore
return dynamic_table

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
config_dict = {
"name": model_node.identifier,
"schema_name": model_node.schema,
"database_name": model_node.database,
"query": model_node.compiled_code,
"warehouse": model_node.config.extra.get("snowflake_warehouse"),
}

if model_node.config.extra.get("target_lag"):
config_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_model_node(model_node)}
)

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
dynamic_table: agate.Row = relation_results["dynamic_table"].rows[0]

config_dict = {
"name": dynamic_table.get("name"),
"schema_name": dynamic_table.get("schema_name"),
"database_name": dynamic_table.get("database_name"),
"query": dynamic_table.get("text"),
"warehouse": dynamic_table.get("warehouse"),
}

if dynamic_table.get("target_lag"):
config_dict.update(
{
"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_relation_results(
dynamic_table
)
}
)

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableWarehouseConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return False


@dataclass
class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
return any(
[
self.target_lag.requires_full_refresh if self.target_lag else False,
self.warehouse.requires_full_refresh if self.warehouse else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.warehouse])
25 changes: 25 additions & 0 deletions dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy
from dbt.dataclass_schema import StrEnum


class SnowflakeRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"


class SnowflakeIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class SnowflakeQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
Loading

0 comments on commit a151224

Please sign in to comment.