Skip to content

Commit

Permalink
Merge branch 'main' into expandIntegTesting
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Jun 20, 2024
2 parents 6db9813 + 07e8292 commit 85d8900
Show file tree
Hide file tree
Showing 19 changed files with 300 additions and 28 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240131-125318.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support refresh_mode and initialize parameters for dynamic tables
time: 2024-01-31T12:53:18.111616Z
custom:
Author: HenkvanDyk,mikealfare
Issue: "1076"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240522-160538.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: 'Rename targets for tables and views use fully qualified names'
time: 2024-05-22T16:05:38.602074-04:00
custom:
Author: mikealfare
Issue: "1031"
24 changes: 11 additions & 13 deletions .github/workflows/docs-issues.yml
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
# **what?**
# Open an issue in docs.getdbt.com when a PR is labeled `user docs`
# Open an issue in docs.getdbt.com when an issue is labeled `user docs` and closed as completed

# **why?**
# To reduce barriers for keeping docs up to date

# **when?**
# When a PR is labeled `user docs` and is merged. Runs on pull_request_target to run off the workflow already merged,
# not the workflow that existed on the PR branch. This allows old PRs to get comments.
# When an issue is labeled `user docs` and is closed as completed. Can be labeled before or after the issue is closed.


name: Open issues in docs.getdbt.com repo when a PR is labeled
run-name: "Open an issue in docs.getdbt.com for PR #${{ github.event.pull_request.number }}"
name: Open issues in docs.getdbt.com repo when an issue is labeled
run-name: "Open an issue in docs.getdbt.com for issue #${{ github.event.issue.number }}"

on:
pull_request_target:
issues:
types: [labeled, closed]

defaults:
run:
shell: bash

permissions:
issues: write # opens new issues
pull-requests: write # comments on PRs

issues: write # comments on issues

jobs:
open_issues:
# we only want to run this when the PR has been merged or the label in the labeled event is `user docs`. Otherwise it runs the
# we only want to run this when the issue is closed as completed and the label `user docs` has been assigned.
# If this logic does not exist in this workflow, it runs the
# risk of duplicaton of issues being created due to merge and label both triggering this workflow to run and neither having
# generating the comment before the other runs. This lives here instead of the shared workflow because this is where we
# decide if it should run or not.
if: |
(github.event.pull_request.merged == true) &&
((github.event.action == 'closed' && contains( github.event.pull_request.labels.*.name, 'user docs')) ||
(github.event.issue.state == 'closed' && github.event.issue.state_reason == 'completed') && (
(github.event.action == 'closed' && contains(github.event.issue.labels.*.name, 'user docs')) ||
(github.event.action == 'labeled' && github.event.label.name == 'user docs'))
uses: dbt-labs/actions/.github/workflows/open-issue-in-repo.yml@main
with:
issue_repository: "dbt-labs/docs.getdbt.com"
issue_title: "Docs Changes Needed from ${{ github.event.repository.name }} PR #${{ github.event.pull_request.number }}"
issue_title: "Docs Changes Needed from ${{ github.event.repository.name }} Issue #${{ github.event.issue.number }}"
issue_body: "At a minimum, update body to include a link to the page on docs.getdbt.com requiring updates and what part(s) of the page you would like to see updated."
secrets: inherit
28 changes: 27 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
from typing import FrozenSet, Optional, Type

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import RelationConfigChangeAction, RelationResults
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeQuotePolicy,
Expand All @@ -21,6 +27,9 @@ class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
SnowflakeRelationType.DynamicTable: SnowflakeDynamicTableConfig,
}
renameable_relations: FrozenSet[SnowflakeRelationType] = field(
default_factory=lambda: frozenset(
{
Expand Down Expand Up @@ -52,6 +61,17 @@ def DynamicTable(cls) -> str:
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType

@classmethod
def from_config(cls, config: RelationConfig) -> RelationConfigBase:
relation_type: str = config.config.materialized

if relation_config := cls.relation_configs.get(relation_type):
return relation_config.from_relation_config(config)

raise DbtRuntimeError(
f"from_config() is not supported for the provided relation type: {relation_type}"
)

@classmethod
def dynamic_table_config_changeset(
cls, relation_results: RelationResults, relation_config: RelationConfig
Expand All @@ -77,6 +97,12 @@ def dynamic_table_config_changeset(
)
)

if new_dynamic_table.refresh_mode != existing_dynamic_table.refresh_mode:
config_change_collection.refresh_mode = SnowflakeDynamicTableRefreshModeConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.refresh_mode,
)

if config_change_collection.has_changes:
return config_change_collection
return None
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
Expand Down
48 changes: 47 additions & 1 deletion dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase

if TYPE_CHECKING:
import agate


class RefreshMode(StrEnum):
AUTO = "AUTO"
FULL = "FULL"
INCREMENTAL = "INCREMENTAL"

@classmethod
def default(cls) -> Self:
return cls("AUTO")


class Initialize(StrEnum):
ON_CREATE = "ON_CREATE"
ON_SCHEDULE = "ON_SCHEDULE"

@classmethod
def default(cls) -> Self:
return cls("ON_CREATE")


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -22,6 +43,8 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
- query: the query behind the table
- target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- snowflake_warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
- refresh_mode: specifies the refresh type for the dynamic table
- initialize: specifies the behavior of the initial refresh of the dynamic table
There are currently no non-configurable parameters.
"""
Expand All @@ -32,6 +55,8 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
Expand All @@ -44,6 +69,8 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
Expand All @@ -60,6 +87,12 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
config_dict.update(refresh_mode=refresh_mode.upper())

if initialize := relation_config.config.extra.get("initialize"):
config_dict.update(initialize=initialize.upper())

return config_dict

@classmethod
Expand All @@ -73,6 +106,8 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}

return config_dict
Expand All @@ -96,10 +131,20 @@ def requires_full_refresh(self) -> bool:
return False


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableRefreshModeConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return True


@dataclass
class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
Expand All @@ -111,9 +156,10 @@ def requires_full_refresh(self) -> bool:
if self.snowflake_warehouse
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
12 changes: 12 additions & 0 deletions dbt/include/snowflake/macros/relations/create_backup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{%- macro snowflake__get_create_backup_sql(relation) -%}

-- get the standard backup name
{% set backup_relation = make_backup_relation(relation, relation.type) %}

-- drop any pre-existing backup
{{ get_drop_sql(backup_relation) }};

-- use `render` to ensure that the fully qualified name is used
{{ get_rename_sql(relation, backup_relation.render()) }}

{%- endmacro -%}
12 changes: 10 additions & 2 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}

{%- set dynamic_table = relation.from_config(config.model) -%}

create dynamic table {{ relation }}
target_lag = '{{ config.get("target_lag") }}'
warehouse = {{ config.get("snowflake_warehouse") }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
as (
{{ sql }}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"database_name",
"text",
"target_lag",
"warehouse"
"warehouse",
"refresh_mode"
from table(result_scan(last_query_id()))
{%- endset %}
{% set _dynamic_table = run_query(_dynamic_table_sql) %}
Expand Down
18 changes: 12 additions & 6 deletions dbt/include/snowflake/macros/relations/dynamic_table/replace.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
{% macro snowflake__get_replace_dynamic_table_sql(relation, sql) %}
{% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%}

{%- set dynamic_table = relation.from_config(config.model) -%}

create or replace dynamic table {{ relation }}
target_lag = '{{ config.get("target_lag") }}'
warehouse = {{ config.get("snowflake_warehouse") }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
as (
{{ sql }}
)
;
{{ snowflake__refresh_dynamic_table(relation) }}

{% endmacro %}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{%- macro snowflake__get_rename_intermediate_sql(relation) -%}

-- get the standard intermediate name
{% set intermediate_relation = make_intermediate_relation(relation) %}

-- use `render` to ensure that the fully qualified name is used
{{ get_rename_sql(intermediate_relation, relation.render()) }}

{%- endmacro -%}
10 changes: 10 additions & 0 deletions dbt/include/snowflake/macros/relations/table/rename.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
{%- macro snowflake__get_rename_table_sql(relation, new_name) -%}
/*
Rename or move a table to the new name.
Args:
relation: SnowflakeRelation - relation to be renamed
new_name: Union[str, SnowflakeRelation] - new name for `relation`
if providing a string, the default database/schema will be used if that string is just an identifier
if providing a SnowflakeRelation, `render` will be used to produce a fully qualified name
Returns: templated string
*/
alter table {{ relation }} rename to {{ new_name }}
{%- endmacro -%}
10 changes: 10 additions & 0 deletions dbt/include/snowflake/macros/relations/view/rename.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
{%- macro snowflake__get_rename_view_sql(relation, new_name) -%}
/*
Rename or move a view to the new name.
Args:
relation: SnowflakeRelation - relation to be renamed
new_name: Union[str, SnowflakeRelation] - new name for `relation`
if providing a string, the default database/schema will be used if that string is just an identifier
if providing a SnowflakeRelation, `render` will be used to produce a fully qualified name
Returns: templated string
*/
alter view {{ relation }} rename to {{ new_name }}
{%- endmacro -%}
1 change: 1 addition & 0 deletions tests/functional/adapter/dynamic_table_tests/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""
Loading

0 comments on commit 85d8900

Please sign in to comment.