diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 513af867d..0fa84469a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -32,6 +32,4 @@ first_value = 1 [bumpversion:part:nightly] -[bumpversion:file:setup.py] - [bumpversion:file:dbt/adapters/snowflake/__version__.py] diff --git a/.changes/unreleased/Fixes-20240516-174337.yaml b/.changes/unreleased/Fixes-20240516-174337.yaml new file mode 100644 index 000000000..955d90ed3 --- /dev/null +++ b/.changes/unreleased/Fixes-20240516-174337.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Update relation caching to correctly identify dynamic tables, accounting for Snowflake's `2024_03` bundle +time: 2024-05-16T17:43:37.336858-04:00 +custom: + Author: mikealfare + Issue: "1016" diff --git a/.changes/unreleased/Under the Hood-20240517-143743.yaml b/.changes/unreleased/Under the Hood-20240517-143743.yaml new file mode 100644 index 000000000..598c60ad4 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240517-143743.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Improve memory efficiency of the process_results() override. +time: 2024-05-17T14:37:43.7414-04:00 +custom: + Author: peterallenwebb + Issue: "1053" diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index af26279e8..1d6e31c93 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from io import StringIO from time import sleep -from typing import Optional, Tuple, Union, Any, List +from typing import Any, List, Iterable, Optional, Tuple, Union import agate from dbt_common.clients.agate_helper import empty_table @@ -443,25 +443,28 @@ def _split_queries(cls, sql): split_query = snowflake.connector.util_text.split_statements(sql_buf) return [part[0] for part in split_query] - @classmethod - def process_results(cls, column_names, rows): - # Override for Snowflake. The datetime objects returned by - # snowflake-connector-python are not pickleable, so we need - # to replace them with sane timezones - fixed = [] + @staticmethod + def _fix_rows(rows: Iterable[Iterable]) -> Iterable[Iterable]: + # See note in process_results(). for row in rows: fixed_row = [] for col in row: if isinstance(col, datetime.datetime) and col.tzinfo: offset = col.utcoffset() + assert offset is not None offset_seconds = offset.total_seconds() - new_timezone = pytz.FixedOffset(offset_seconds // 60) + new_timezone = pytz.FixedOffset(int(offset_seconds // 60)) col = col.astimezone(tz=new_timezone) fixed_row.append(col) - fixed.append(fixed_row) + yield fixed_row - return super().process_results(column_names, fixed) + @classmethod + def process_results(cls, column_names, rows): + # Override for Snowflake. The datetime objects returned by + # snowflake-connector-python are not pickleable, so we need + # to replace them with sane timezones. + return super().process_results(column_names, cls._fix_rows(rows)) def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index ebb15f753..e3bc3ae0b 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -143,26 +143,37 @@ def list_relations_without_caching( return [] raise - relations = [] - quote_policy = {"database": True, "schema": True, "identifier": True} - + # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory columns = ["database_name", "schema_name", "name", "kind"] - for _database, _schema, _identifier, _type in results.select(columns): - try: - _type = self.Relation.get_relation_type(_type.lower()) - except ValueError: - _type = self.Relation.External - relations.append( - self.Relation.create( - database=_database, - schema=_schema, - identifier=_identifier, - quote_policy=quote_policy, - type=_type, - ) - ) + if "is_dynamic" in results.column_names: + columns.append("is_dynamic") + + return [self._parse_list_relations_result(result) for result in results.select(columns)] + + def _parse_list_relations_result(self, result: agate.Row) -> SnowflakeRelation: + # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory + try: + database, schema, identifier, relation_type, is_dynamic = result + except ValueError: + database, schema, identifier, relation_type = result + is_dynamic = "N" - return relations + try: + relation_type = self.Relation.get_relation_type(relation_type.lower()) + except ValueError: + relation_type = self.Relation.External + + if relation_type == self.Relation.Table and is_dynamic == "Y": + relation_type = self.Relation.DynamicTable + + quote_policy = {"database": True, "schema": True, "identifier": True} + return self.Relation.create( + database=database, + schema=schema, + identifier=identifier, + type=relation_type, + quote_policy=quote_policy, + ) def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str: quote_columns: bool = False diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 157738187..0bf7b7d1b 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -73,7 +73,7 @@ {% for _ in range(0, max_iter) %} {%- set paginated_sql -%} - show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}' + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}' {%- endset -%} {%- set paginated_result = run_query(paginated_sql) %} @@ -124,7 +124,7 @@ {%- set max_total_results = max_results_per_iter * max_iter -%} {%- set sql -%} - show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} {%- endset -%} {%- set result = run_query(sql) -%} diff --git a/setup.py b/setup.py index 728dcc8c6..f8ff363ed 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,8 @@ #!/usr/bin/env python import os +from pathlib import Path + import sys -import re # require python 3.8 or newer if sys.version_info < (3, 8): @@ -28,28 +29,25 @@ long_description = f.read() -# get this package's version from dbt/adapters//__version__.py -def _get_plugin_version_dict(): - _version_path = os.path.join(this_directory, "dbt", "adapters", "snowflake", "__version__.py") - _semver = r"""(?P\d+)\.(?P\d+)\.(?P\d+)""" - _pre = r"""((?Pa|b|rc)(?P
\d+))?"""
-    _nightly = r"""(\.(?P[a-z0-9]+)?)?"""
-    _build = r"""(\+build[0-9]+)?"""
-    _version_pattern = rf"""version\s*=\s*["']{_semver}{_pre}{_nightly}{_build}["']"""
-    with open(_version_path) as f:
-        match = re.search(_version_pattern, f.read().strip())
-        if match is None:
-            raise ValueError(f"invalid version at {_version_path}")
-        return match.groupdict()
+# used for this adapter's version
+VERSION = Path(__file__).parent / "dbt/adapters/snowflake/__version__.py"
+
+
+def _plugin_version() -> str:
+    """
+    Pull the package version from the main package version file
+    """
+    attributes = {}
+    exec(VERSION.read_text(), attributes)
+    return attributes["version"]
 
 
 package_name = "dbt-snowflake"
-package_version = "1.9.0a1"
 description = """The Snowflake adapter plugin for dbt"""
 
 setup(
     name=package_name,
-    version=package_version,
+    version=_plugin_version(),
     description=description,
     long_description=long_description,
     long_description_content_type="text/markdown",
diff --git a/tests/functional/adapter/test_list_relations_without_caching.py b/tests/functional/adapter/list_relations_tests/test_pagination.py
similarity index 86%
rename from tests/functional/adapter/test_list_relations_without_caching.py
rename to tests/functional/adapter/list_relations_tests/test_pagination.py
index b126984a3..8f14a0012 100644
--- a/tests/functional/adapter/test_list_relations_without_caching.py
+++ b/tests/functional/adapter/list_relations_tests/test_pagination.py
@@ -1,3 +1,5 @@
+import os
+
 import pytest
 
 import json
@@ -5,15 +7,19 @@
 
 # Testing rationale:
 # - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
-# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail
+# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
 #   unless we paginate the result
 # - however, testing this process is difficult at a full scale of 10K actual objects populated
 #   into a fresh testing schema
 # - accordingly, we create a smaller set of views and test the looping iteration logic in
 #   smaller chunks
 
-NUM_VIEWS = 100
-NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS
+NUM_VIEWS = 90
+NUM_DYNAMIC_TABLES = 10
+# the total number should be between the numbers referenced in the "passing" and "failing" macros below
+# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
+# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
+NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES
 
 TABLE_BASE_SQL = """
 {{ config(materialized='table') }}
@@ -25,6 +31,20 @@
 select id from {{ ref('my_model_base') }}
 """.lstrip()
 
+DYNAMIC_TABLE = (
+    """
+{{ config(
+    materialized='dynamic_table',
+    target_lag='1 hour',
+    snowflake_warehouse='"""
+    + os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+    + """',
+) }}
+
+select id from {{ ref('my_model_base') }}
+"""
+)
+
 MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
 {% macro validate_list_relations_without_caching(schema_relation) %}
     {% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
@@ -81,7 +101,8 @@ def models(self):
         my_models = {"my_model_base.sql": TABLE_BASE_SQL}
         for view in range(0, NUM_VIEWS):
             my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
-
+        for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
+            my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
         return my_models
 
     @pytest.fixture(scope="class")
@@ -126,7 +147,8 @@ def models(self):
         my_models = {"my_model_base.sql": TABLE_BASE_SQL}
         for view in range(0, NUM_VIEWS):
             my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
-
+        for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
+            my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
         return my_models
 
     @pytest.fixture(scope="class")
diff --git a/tests/functional/adapter/list_relations_tests/test_show_objects.py b/tests/functional/adapter/list_relations_tests/test_show_objects.py
new file mode 100644
index 000000000..e5eee39d9
--- /dev/null
+++ b/tests/functional/adapter/list_relations_tests/test_show_objects.py
@@ -0,0 +1,89 @@
+import os
+from typing import List
+
+import pytest
+
+from dbt.adapters.factory import get_adapter_by_type
+from dbt.adapters.snowflake import SnowflakeRelation
+
+from dbt.tests.util import run_dbt, get_connection
+
+
+SEED = """
+id,value
+0,red
+1,yellow
+2,blue
+""".strip()
+
+
+VIEW = """
+select * from {{ ref('my_seed') }}
+"""
+
+
+TABLE = """
+{{ config(materialized='table') }}
+select * from {{ ref('my_seed') }}
+"""
+
+
+DYNAMIC_TABLE = (
+    """
+{{ config(
+    materialized='dynamic_table',
+    target_lag='1 day',
+    snowflake_warehouse='"""
+    + os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+    + """',
+) }}
+select * from {{ ref('my_seed') }}
+"""
+)
+
+
+class TestShowObjects:
+    views: int = 10
+    tables: int = 10
+    dynamic_tables: int = 10
+
+    @pytest.fixture(scope="class")
+    def seeds(self):
+        yield {"my_seed.csv": SEED}
+
+    @pytest.fixture(scope="class")
+    def models(self):
+        models = {}
+        models.update({f"my_view_{i}.sql": VIEW for i in range(self.views)})
+        models.update({f"my_table_{i}.sql": TABLE for i in range(self.tables)})
+        models.update(
+            {f"my_dynamic_table_{i}.sql": DYNAMIC_TABLE for i in range(self.dynamic_tables)}
+        )
+        yield models
+
+    @pytest.fixture(scope="class", autouse=True)
+    def setup(self, project):
+        run_dbt(["seed"])
+        run_dbt(["run"])
+
+    @staticmethod
+    def list_relations_without_caching(project) -> List[SnowflakeRelation]:
+        my_adapter = get_adapter_by_type("snowflake")
+        schema = my_adapter.Relation.create(
+            database=project.database, schema=project.test_schema, identifier=""
+        )
+        with get_connection(my_adapter):
+            relations = my_adapter.list_relations_without_caching(schema)
+        return relations
+
+    def test_list_relations_without_caching(self, project):
+        relations = self.list_relations_without_caching(project)
+        assert len([relation for relation in relations if relation.is_view]) == self.views
+        assert (
+            len([relation for relation in relations if relation.is_table])
+            == self.tables + 1  # add the seed
+        )
+        assert (
+            len([relation for relation in relations if relation.is_dynamic_table])
+            == self.dynamic_tables
+        )