Skip to content

Commit

Permalink
Merge branch 'spike_source_refresh' of https://github.com/dbt-labs/db…
Browse files Browse the repository at this point in the history
…t-snowflake into spike_run_external_materialization
  • Loading branch information
dataders committed May 22, 2024
2 parents 67e3cee + 15d1c44 commit 2b744f0
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 53 deletions.
2 changes: 0 additions & 2 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,4 @@ first_value = 1

[bumpversion:part:nightly]

[bumpversion:file:setup.py]

[bumpversion:file:dbt/adapters/snowflake/__version__.py]
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240516-174337.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Update relation caching to correctly identify dynamic tables, accounting for Snowflake's `2024_03` bundle
time: 2024-05-16T17:43:37.336858-04:00
custom:
Author: mikealfare
Issue: "1016"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240517-143743.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Improve memory efficiency of the process_results() override.
time: 2024-05-17T14:37:43.7414-04:00
custom:
Author: peterallenwebb
Issue: "1053"
23 changes: 13 additions & 10 deletions dbt/adapters/snowflake/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass
from io import StringIO
from time import sleep
from typing import Optional, Tuple, Union, Any, List
from typing import Any, List, Iterable, Optional, Tuple, Union

import agate
from dbt_common.clients.agate_helper import empty_table
Expand Down Expand Up @@ -443,25 +443,28 @@ def _split_queries(cls, sql):
split_query = snowflake.connector.util_text.split_statements(sql_buf)
return [part[0] for part in split_query]

@classmethod
def process_results(cls, column_names, rows):
# Override for Snowflake. The datetime objects returned by
# snowflake-connector-python are not pickleable, so we need
# to replace them with sane timezones
fixed = []
@staticmethod
def _fix_rows(rows: Iterable[Iterable]) -> Iterable[Iterable]:
# See note in process_results().
for row in rows:
fixed_row = []
for col in row:
if isinstance(col, datetime.datetime) and col.tzinfo:
offset = col.utcoffset()
assert offset is not None
offset_seconds = offset.total_seconds()
new_timezone = pytz.FixedOffset(offset_seconds // 60)
new_timezone = pytz.FixedOffset(int(offset_seconds // 60))
col = col.astimezone(tz=new_timezone)
fixed_row.append(col)

fixed.append(fixed_row)
yield fixed_row

return super().process_results(column_names, fixed)
@classmethod
def process_results(cls, column_names, rows):
# Override for Snowflake. The datetime objects returned by
# snowflake-connector-python are not pickleable, so we need
# to replace them with sane timezones.
return super().process_results(column_names, cls._fix_rows(rows))

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
Expand Down
47 changes: 29 additions & 18 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,26 +143,37 @@ def list_relations_without_caching(
return []
raise

relations = []
quote_policy = {"database": True, "schema": True, "identifier": True}

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
for _database, _schema, _identifier, _type in results.select(columns):
try:
_type = self.Relation.get_relation_type(_type.lower())
except ValueError:
_type = self.Relation.External
relations.append(
self.Relation.create(
database=_database,
schema=_schema,
identifier=_identifier,
quote_policy=quote_policy,
type=_type,
)
)
if "is_dynamic" in results.column_names:
columns.append("is_dynamic")

return [self._parse_list_relations_result(result) for result in results.select(columns)]

def _parse_list_relations_result(self, result: agate.Row) -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"

return relations
try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
except ValueError:
relation_type = self.Relation.External

if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

quote_policy = {"database": True, "schema": True, "identifier": True}
return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
quote_policy=quote_policy,
)

def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
quote_columns: bool = False
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
{% for _ in range(0, max_iter) %}

{%- set paginated_sql -%}
show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
{%- endset -%}

{%- set paginated_result = run_query(paginated_sql) %}
Expand Down Expand Up @@ -124,7 +124,7 @@
{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}
{%- endset -%}

{%- set result = run_query(sql) -%}
Expand Down
30 changes: 14 additions & 16 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env python
import os
from pathlib import Path

import sys
import re

# require python 3.8 or newer
if sys.version_info < (3, 8):
Expand All @@ -28,28 +29,25 @@
long_description = f.read()


# get this package's version from dbt/adapters/<adapter name>/__version__.py
def _get_plugin_version_dict():
_version_path = os.path.join(this_directory, "dbt", "adapters", "snowflake", "__version__.py")
_semver = r"""(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)"""
_pre = r"""((?P<prekind>a|b|rc)(?P<pre>\d+))?"""
_nightly = r"""(\.(?P<nightly>[a-z0-9]+)?)?"""
_build = r"""(\+build[0-9]+)?"""
_version_pattern = rf"""version\s*=\s*["']{_semver}{_pre}{_nightly}{_build}["']"""
with open(_version_path) as f:
match = re.search(_version_pattern, f.read().strip())
if match is None:
raise ValueError(f"invalid version at {_version_path}")
return match.groupdict()
# used for this adapter's version
VERSION = Path(__file__).parent / "dbt/adapters/snowflake/__version__.py"


def _plugin_version() -> str:
"""
Pull the package version from the main package version file
"""
attributes = {}
exec(VERSION.read_text(), attributes)
return attributes["version"]


package_name = "dbt-snowflake"
package_version = "1.9.0a1"
description = """The Snowflake adapter plugin for dbt"""

setup(
name=package_name,
version=package_version,
version=_plugin_version(),
description=description,
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import os

import pytest

import json
from dbt.tests.util import run_dbt, run_dbt_and_capture

# Testing rationale:
# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail
# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 100
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS
NUM_VIEWS = 90
NUM_DYNAMIC_TABLES = 10
# the total number should be between the numbers referenced in the "passing" and "failing" macros below
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES

TABLE_BASE_SQL = """
{{ config(materialized='table') }}
Expand All @@ -25,6 +31,20 @@
select id from {{ ref('my_model_base') }}
""".lstrip()

DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 hour',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}
select id from {{ ref('my_model_base') }}
"""
)

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
Expand Down Expand Up @@ -81,7 +101,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -126,7 +147,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down
89 changes: 89 additions & 0 deletions tests/functional/adapter/list_relations_tests/test_show_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
from typing import List

import pytest

from dbt.adapters.factory import get_adapter_by_type
from dbt.adapters.snowflake import SnowflakeRelation

from dbt.tests.util import run_dbt, get_connection


SEED = """
id,value
0,red
1,yellow
2,blue
""".strip()


VIEW = """
select * from {{ ref('my_seed') }}
"""


TABLE = """
{{ config(materialized='table') }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 day',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}
select * from {{ ref('my_seed') }}
"""
)


class TestShowObjects:
views: int = 10
tables: int = 10
dynamic_tables: int = 10

@pytest.fixture(scope="class")
def seeds(self):
yield {"my_seed.csv": SEED}

@pytest.fixture(scope="class")
def models(self):
models = {}
models.update({f"my_view_{i}.sql": VIEW for i in range(self.views)})
models.update({f"my_table_{i}.sql": TABLE for i in range(self.tables)})
models.update(
{f"my_dynamic_table_{i}.sql": DYNAMIC_TABLE for i in range(self.dynamic_tables)}
)
yield models

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])

@staticmethod
def list_relations_without_caching(project) -> List[SnowflakeRelation]:
my_adapter = get_adapter_by_type("snowflake")
schema = my_adapter.Relation.create(
database=project.database, schema=project.test_schema, identifier=""
)
with get_connection(my_adapter):
relations = my_adapter.list_relations_without_caching(schema)
return relations

def test_list_relations_without_caching(self, project):
relations = self.list_relations_without_caching(project)
assert len([relation for relation in relations if relation.is_view]) == self.views
assert (
len([relation for relation in relations if relation.is_table])
== self.tables + 1 # add the seed
)
assert (
len([relation for relation in relations if relation.is_dynamic_table])
== self.dynamic_tables
)

0 comments on commit 2b744f0

Please sign in to comment.