diff --git a/.changes/unreleased/Features-20241107-170307.yaml b/.changes/unreleased/Features-20241107-170307.yaml new file mode 100644 index 000000000..1479c5805 --- /dev/null +++ b/.changes/unreleased/Features-20241107-170307.yaml @@ -0,0 +1,7 @@ +kind: Features +body: 'Allow configurable pagination on list_relations_without_caching to support + users with a large number of objects per schema' +time: 2024-11-07T17:03:07.826352-05:00 +custom: + Author: mikealfare + Issue: "1234" diff --git a/.changes/unreleased/Fixes-20241104-104610.yaml b/.changes/unreleased/Fixes-20241104-104610.yaml new file mode 100644 index 000000000..c512d0bdd --- /dev/null +++ b/.changes/unreleased/Fixes-20241104-104610.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: 'Performance fixes for snowflake microbatch strategy: use temp view instead + of table, remove unnecessary ''using'' clause' +time: 2024-11-04T10:46:10.005317-05:00 +custom: + Author: michelleark + Issue: "1228" diff --git a/.changes/unreleased/Fixes-20241104-172349.yaml b/.changes/unreleased/Fixes-20241104-172349.yaml new file mode 100644 index 000000000..07c90d93c --- /dev/null +++ b/.changes/unreleased/Fixes-20241104-172349.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Iceberg quoting ignore fix. +time: 2024-11-04T17:23:49.706297-08:00 +custom: + Author: versusfacit + Issue: "1227" diff --git a/.changes/unreleased/Under the Hood-20241106-113249.yaml b/.changes/unreleased/Under the Hood-20241106-113249.yaml new file mode 100644 index 000000000..0437a8c88 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241106-113249.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: remove SnowflakeAdapterResponse in favor of updated AdapterResponse in base +time: 2024-11-06T11:32:49.503467-08:00 +custom: + Author: colin-rogers-dbt + Issue: "1233" diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index 10bee30f0..fc2c09c19 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -84,11 +84,6 @@ def snowflake_private_key(private_key: RSAPrivateKey) -> bytes: ) -@dataclass -class SnowflakeAdapterResponse(AdapterResponse): - query_id: str = "" - - @dataclass class SnowflakeCredentials(Credentials): account: str @@ -447,17 +442,17 @@ def cancel(self, connection): logger.debug("Cancel query '{}': {}".format(connection_name, res)) @classmethod - def get_response(cls, cursor) -> SnowflakeAdapterResponse: + def get_response(cls, cursor) -> AdapterResponse: code = cursor.sqlstate if code is None: code = "SUCCESS" - - return SnowflakeAdapterResponse( + query_id = str(cursor.sfqid) if cursor.sfqid is not None else None + return AdapterResponse( _message="{} {}".format(code, cursor.rowcount), rows_affected=cursor.rowcount, code=code, - query_id=cursor.sfqid, + query_id=query_id, ) # disable transactional logic by default on Snowflake diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 3375ba169..dc256c1cb 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -264,6 +264,11 @@ def list_relations_without_caching( # this can be collapsed once Snowflake adds is_iceberg to show objects columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] if self.behavior.enable_iceberg_materializations.no_warn: + # The QUOTED_IDENTIFIERS_IGNORE_CASE setting impacts column names like + # is_iceberg which is created by dbt, but it does not affect the case + # of column values in Snowflake's SHOW OBJECTS query! This + # normalization step ensures metadata queries are handled consistently. + schema_objects = schema_objects.rename(column_names={"IS_ICEBERG": "is_iceberg"}) columns.append("is_iceberg") return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index b60cea0b0..3c93d41ad 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -111,9 +111,10 @@ {%- if loop.index == max_iter -%} {%- set msg -%} - dbt will list a maximum of {{ max_total_results }} objects in schema {{ schema_relation }}. - Your schema exceeds this limit. Please contact support@getdbt.com for troubleshooting tips, - or review and reduce the number of objects contained. + dbt is currently configured to list a maximum of {{ max_total_results }} objects per schema. + {{ schema_relation }} exceeds this limit. If this is expected, you may configure this limit + by setting list_relations_per_page and list_relations_page_limit in your project flags. + It is recommended to start by increasing list_relations_page_limit to something more than the default of 10. {%- endset -%} {% do exceptions.raise_compiler_error(msg) %} @@ -136,6 +137,8 @@ {% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %} + {%- set max_results_per_iter = adapter.config.flags.get('list_relations_per_page', max_results_per_iter) -%} + {%- set max_iter = adapter.config.flags.get('list_relations_page_limit', max_iter) -%} {%- set max_total_results = max_results_per_iter * max_iter -%} {%- set sql -%} {% if schema_relation is string %} @@ -147,7 +150,7 @@ {# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the -- latency penalty. #} {% if adapter.behavior.enable_iceberg_materializations.no_warn %} - select all_objects.*, is_iceberg as "is_iceberg" + select all_objects.*, is_iceberg from table(result_scan(last_query_id(-1))) all_objects left join INFORMATION_SCHEMA.tables as all_tables on all_tables.table_name = all_objects."name" diff --git a/dbt/include/snowflake/macros/materializations/incremental.sql b/dbt/include/snowflake/macros/materializations/incremental.sql index d73525d6d..dbb79de02 100644 --- a/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/dbt/include/snowflake/macros/materializations/incremental.sql @@ -20,7 +20,7 @@ The append strategy can use a view because it will run a single INSERT statement. - When unique_key is none, the delete+insert strategy can use a view beacuse a + When unique_key is none, the delete+insert and microbatch strategies can use a view beacuse a single INSERT statement is run with no DELETES as part of the statement. Otherwise, play it safe by using a temporary table. #} */ @@ -32,10 +32,10 @@ ) %} {% endif %} - {% if strategy == "delete+insert" and tmp_relation_type is not none and tmp_relation_type != "table" and unique_key is not none %} + {% if strategy in ["delete+insert", "microbatch"] and tmp_relation_type is not none and tmp_relation_type != "table" and unique_key is not none %} {% do exceptions.raise_compiler_error( "In order to maintain consistent results when `unique_key` is not none, - the `delete+insert` strategy only supports `table` for `tmp_relation_type` but " + the `" ~ strategy ~ "` strategy only supports `table` for `tmp_relation_type` but " ~ tmp_relation_type ~ " was specified." ) %} @@ -49,7 +49,7 @@ {{ return("view") }} {% elif strategy in ("default", "merge", "append") %} {{ return("view") }} - {% elif strategy == "delete+insert" and unique_key is none %} + {% elif strategy in ["delete+insert", "microbatch"] and unique_key is none %} {{ return("view") }} {% else %} {{ return("table") }} diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index 57c58afdd..c8ac8d6fd 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -66,7 +66,6 @@ {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} delete from {{ target }} DBT_INTERNAL_TARGET - using {{ source }} where ( {% for predicate in incremental_predicates %} {%- if not loop.first %}and {% endif -%} {{ predicate }} diff --git a/tests/functional/adapter/list_relations_tests/test_pagination.py b/tests/functional/adapter/list_relations_tests/test_pagination.py index 407f9c501..7dd382af5 100644 --- a/tests/functional/adapter/list_relations_tests/test_pagination.py +++ b/tests/functional/adapter/list_relations_tests/test_pagination.py @@ -1,34 +1,31 @@ import os + import pytest -import json -from dbt.tests.util import run_dbt, run_dbt_and_capture -from dbt.adapters.snowflake import SnowflakeRelation # Ensure this is the correct import path - -# Testing rationale: -# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call -# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail -# unless we paginate the result -# - however, testing this process is difficult at a full scale of 10K actual objects populated -# into a fresh testing schema -# - accordingly, we create a smaller set of views and test the looping iteration logic in -# smaller chunks - -NUM_VIEWS = 90 -NUM_DYNAMIC_TABLES = 10 -# the total number should be between the numbers referenced in the "passing" and "failing" macros below -# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects) -# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects) -NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES - -TABLE_BASE_SQL = """ -{{ config(materialized='table') }} +from dbt_common.exceptions import CompilationError +from dbt.tests.util import run_dbt + +""" +Testing rationale: +- snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call +- when dbt attempts to write into a schema with more than 10K objects, compilation will fail + unless we paginate the result +- we default pagination to 10 pages, but users want to configure this + - we instead use that here to force failures by making it smaller +""" + + +TABLE = """ +{{ config(materialized='table') }} select 1 as id -""".lstrip() +""" + -VIEW_X_SQL = """ +VIEW = """ +{{ config(materialized='view') }} select id from {{ ref('my_model_base') }} -""".lstrip() +""" + DYNAMIC_TABLE = ( """ @@ -44,173 +41,71 @@ """ ) -MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """ -{% macro validate_list_relations_without_caching(schema_relation) %} - {% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %} - {% set n_relations = relation_list_result | length %} - {{ log("n_relations: " ~ n_relations) }} -{% endmacro %} -""" - -MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR = """ -{% macro validate_list_relations_without_caching_raise_error(schema_relation) %} - {{ snowflake__list_relations_without_caching(schema_relation, max_iter=33, max_results_per_iter=3) }} -{% endmacro %} -""" - - -def parse_json_logs(json_log_output): - parsed_logs = [] - for line in json_log_output.split("\n"): - try: - log = json.loads(line) - except ValueError: - continue - - parsed_logs.append(log) - - return parsed_logs +class BaseConfig: + VIEWS = 90 + DYNAMIC_TABLES = 10 -def find_result_in_parsed_logs(parsed_logs, result_name): - return next( - ( - item["data"]["msg"] - for item in parsed_logs - if result_name in item["data"].get("msg", "msg") - ), - False, - ) - - -def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name): - return next( - ( - item["data"]["exc_info"] - for item in parsed_logs - if exc_info_name in item["data"].get("exc_info", "exc_info") - ), - False, - ) - - -class TestListRelationsWithoutCachingSingle: @pytest.fixture(scope="class") def models(self): - my_models = {"my_model_base.sql": TABLE_BASE_SQL} - for view in range(0, NUM_VIEWS): - my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) - for dynamic_table in range(0, NUM_DYNAMIC_TABLES): - my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE}) + my_models = {"my_model_base.sql": TABLE} + for view in range(0, self.VIEWS): + my_models[f"my_model_{view}.sql"] = VIEW + for dynamic_table in range(0, self.DYNAMIC_TABLES): + my_models[f"my_dynamic_table_{dynamic_table}.sql"] = DYNAMIC_TABLE return my_models - @pytest.fixture(scope="class") - def macros(self): - return { - "validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING, - } + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["run"]) - def test__snowflake__list_relations_without_caching_termination(self, project): - """ - validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching - macro when there are fewer than max_results_per_iter relations in the target schema - """ - run_dbt(["run", "-s", "my_model_base"]) - - database = project.database - schemas = project.created_schemas - - for schema in schemas: - schema_relation = SnowflakeRelation.create(database=database, schema=schema) - kwargs = {"schema_relation": schema_relation.render()} - _, log_output = run_dbt_and_capture( - [ - "--debug", - "--log-format=json", - "run-operation", - "validate_list_relations_without_caching", - "--args", - str(kwargs), - ] + def test_list_relations(self, project): + kwargs = {"schema_relation": project.test_schema} + with project.adapter.connection_named("__test"): + relations = project.adapter.execute_macro( + "snowflake__list_relations_without_caching", kwargs=kwargs ) + assert len(relations) == self.VIEWS + self.DYNAMIC_TABLES + 1 - parsed_logs = parse_json_logs(log_output) - n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations") - assert n_relations == "n_relations: 1" +class TestListRelationsWithoutCachingSmall(BaseConfig): + pass -class TestListRelationsWithoutCachingFull: - @pytest.fixture(scope="class") - def models(self): - my_models = {"my_model_base.sql": TABLE_BASE_SQL} - for view in range(0, NUM_VIEWS): - my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) - for dynamic_table in range(0, NUM_DYNAMIC_TABLES): - my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE}) - return my_models +class TestListRelationsWithoutCachingLarge(BaseConfig): @pytest.fixture(scope="class") - def macros(self): + def profiles_config_update(self): return { - "validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING, - "validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR, + "flags": { + "list_relations_per_page": 10, + "list_relations_page_limit": 20, + } } - def test__snowflake__list_relations_without_caching(self, project): - """ - validates pagination logic in snowflake__list_relations_without_caching macro counts - the correct number of objects in the target schema when having to make multiple looped - calls of SHOW TERSE OBJECTS. - """ - # purpose of the first run is to create the replicated views in the target schema - run_dbt(["run"]) - database = project.database - schemas = project.created_schemas - - for schema in schemas: - schema_relation = SnowflakeRelation.create(database=database, schema=schema) - kwargs = {"schema_relation": schema_relation.render()} - _, log_output = run_dbt_and_capture( - [ - "--debug", - "--log-format=json", - "run-operation", - "validate_list_relations_without_caching", - "--args", - str(kwargs), - ] - ) - parsed_logs = parse_json_logs(log_output) - n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations") +class TestListRelationsWithoutCachingTooLarge(BaseConfig): - assert n_relations == f"n_relations: {NUM_EXPECTED_RELATIONS}" - - def test__snowflake__list_relations_without_caching_raise_error(self, project): - """ - validates pagination logic terminates and raises a compilation error - when exceeding the limit of how many results to return. - """ - run_dbt(["run"]) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "list_relations_per_page": 10, + "list_relations_page_limit": 5, + } + } - database = project.database - schemas = project.created_schemas - - for schema in schemas: - schema_relation = SnowflakeRelation.create(database=database, schema=schema) - - kwargs = {"schema_relation": schema_relation.render()} - _, log_output = run_dbt_and_capture( - [ - "--debug", - "--log-format=json", - "run-operation", - "validate_list_relations_without_caching_raise_error", - "--args", - str(kwargs), - ], - expect_pass=False, - ) - parsed_logs = parse_json_logs(log_output) - traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback") - assert "dbt will list a maximum of 99 objects in schema " in traceback + def test_list_relations(self, project): + kwargs = {"schema_relation": project.test_schema} + with project.adapter.connection_named("__test"): + with pytest.raises(CompilationError) as error: + project.adapter.execute_macro( + "snowflake__list_relations_without_caching", kwargs=kwargs + ) + assert "list_relations_per_page" in error.value.msg + assert "list_relations_page_limit" in error.value.msg + + def test_on_run(self, project): + with pytest.raises(CompilationError) as error: + run_dbt(["run"]) + assert "list_relations_per_page" in error.value.msg + assert "list_relations_page_limit" in error.value.msg diff --git a/tests/functional/adapter/list_relations_tests/test_show_objects.py b/tests/functional/adapter/list_relations_tests/test_show_objects.py index e5eee39d9..91fb94f79 100644 --- a/tests/functional/adapter/list_relations_tests/test_show_objects.py +++ b/tests/functional/adapter/list_relations_tests/test_show_objects.py @@ -3,6 +3,8 @@ import pytest +from pathlib import Path + from dbt.adapters.factory import get_adapter_by_type from dbt.adapters.snowflake import SnowflakeRelation @@ -41,8 +43,32 @@ """ ) +_MODEL_ICEBERG = """ +{{ + config( + materialized = "table", + table_format="iceberg", + external_volume="s3_iceberg_snow", + ) +}} + +select 1 +""" + + +class ShowObjectsBase: + @staticmethod + def list_relations_without_caching(project) -> List[SnowflakeRelation]: + my_adapter = get_adapter_by_type("snowflake") + schema = my_adapter.Relation.create( + database=project.database, schema=project.test_schema, identifier="" + ) + with get_connection(my_adapter): + relations = my_adapter.list_relations_without_caching(schema) + return relations + -class TestShowObjects: +class TestShowObjects(ShowObjectsBase): views: int = 10 tables: int = 10 dynamic_tables: int = 10 @@ -66,16 +92,6 @@ def setup(self, project): run_dbt(["seed"]) run_dbt(["run"]) - @staticmethod - def list_relations_without_caching(project) -> List[SnowflakeRelation]: - my_adapter = get_adapter_by_type("snowflake") - schema = my_adapter.Relation.create( - database=project.database, schema=project.test_schema, identifier="" - ) - with get_connection(my_adapter): - relations = my_adapter.list_relations_without_caching(schema) - return relations - def test_list_relations_without_caching(self, project): relations = self.list_relations_without_caching(project) assert len([relation for relation in relations if relation.is_view]) == self.views @@ -87,3 +103,25 @@ def test_list_relations_without_caching(self, project): len([relation for relation in relations if relation.is_dynamic_table]) == self.dynamic_tables ) + + +class TestShowIcebergObjects(ShowObjectsBase): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": _MODEL_ICEBERG} + + def test_quoting_ignore_flag_doesnt_break_iceberg_metadata(self, project): + """https://github.com/dbt-labs/dbt-snowflake/issues/1227 + + The list relations function involves a metadata sub-query. Regardless of + QUOTED_IDENTIFIERS_IGNORE_CASE, this function will fail without proper + normalization within the encapsulating python function after the macro invocation + returns. This test verifies that normalization is working. + """ + run_dbt(["run"]) + + self.list_relations_without_caching(project) diff --git a/tests/unit/test_snowflake_adapter.py b/tests/unit/test_snowflake_adapter.py index 32e73eb45..aa580aad2 100644 --- a/tests/unit/test_snowflake_adapter.py +++ b/tests/unit/test_snowflake_adapter.py @@ -60,8 +60,10 @@ def setUp(self): self.handle = mock.MagicMock(spec=snowflake_connector.SnowflakeConnection) self.cursor = self.handle.cursor.return_value self.mock_execute = self.cursor.execute + self.mock_execute.return_value = mock.MagicMock(sfqid="42") self.patcher = mock.patch("dbt.adapters.snowflake.connections.snowflake.connector.connect") self.snowflake = self.patcher.start() + self.snowflake.connect.cursor.return_value = mock.MagicMock(sfqid="42") # Create the Manifest.state_check patcher @mock.patch("dbt.parser.manifest.ManifestLoader.build_manifest_state_check") @@ -90,7 +92,6 @@ def _mock_state_check(self): self.qh_patch = mock.patch.object(self.adapter.connections.query_header, "add") self.mock_query_header_add = self.qh_patch.start() self.mock_query_header_add.side_effect = lambda q: "/* dbt */\n{}".format(q) - self.adapter.acquire_connection() inject_adapter(self.adapter, SnowflakePlugin)