diff --git a/.changes/unreleased/Under the Hood-20231018-145751.yaml b/.changes/unreleased/Under the Hood-20231018-145751.yaml new file mode 100644 index 000000000..49cf80e8c --- /dev/null +++ b/.changes/unreleased/Under the Hood-20231018-145751.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Implements pagination on `list_schemas` process +time: 2023-10-18T14:57:51.629173-07:00 +custom: + Author: matt-winkler + Issue: "810" diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index b2b496356..9e6fcb808 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -47,23 +47,108 @@ {% do return(columns) %} {% endmacro %} -{% macro snowflake__list_schemas(database) -%} +{% macro snowflake__get_paginated_schemas_array(max_iter, max_results_per_iter, max_total_results, database, watermark) %} + + {% set paginated_schemas = [] %} + + {% for _ in range(0, max_iter) %} + {%- set paginated_sql -%} + show terse schemas in database {{ database }} limit {{ max_results_per_iter }} from '{{ watermark.schema_name }}'; + {%- endset -%} + + {%- set paginated_result = run_query(paginated_sql) %} + {%- set paginated_n = (paginated_result | length) -%} + + {# + terminating condition: if there are 0 records in the result we reached + the end exactly on the previous iteration + #} + {%- if paginated_n == 0 -%} + {%- break -%} + {%- endif -%} + + {# + terminating condition: At some point the user needs to be reasonable with how + many schemas are contained in their databases. Since there was already + one iteration before attempting pagination, loop.index == max_iter means + the limit has been surpassed. + #} + + {%- if loop.index == max_iter -%} + {%- set msg -%} + dbt will list a maximum of {{ max_total_results }} schemas in database {{ database }}. + Your database exceeds this limit. Please contact support@getdbt.com for troubleshooting tips, + or review and reduce the number of objects contained. + {%- endset -%} + + {% do exceptions.raise_compiler_error(msg) %} + {%- endif -%} + + {%- do paginated_schemas.append(paginated_result) -%} + {% set watermark.schema_name = paginated_result.columns[1].values()[-1] %} + + {# + terminating condition: paginated_n < max_results_per_iter means we reached the end + #} + {%- if paginated_n < max_results_per_iter -%} + {%- break -%} + {%- endif -%} + {%- endfor -%} + + {{ return(paginated_schemas) }} + +{% endmacro %} + +{% macro snowflake__list_schemas(database, max_iter=10, max_results_per_iter=1000) %} + + {%- set max_total_results = max_results_per_iter * max_iter -%} + + {%- set sql -%} + show terse schemas in database {{ database }} limit {{ max_results_per_iter }}; + {%- endset -%} + + {%- set result = run_query(sql) -%} + + {%- set n = (result | length) -%} + {%- set watermark = namespace(schema_name=result.columns[1].values()[-1]) -%} + {%- set paginated = namespace(result=[]) -%} + + {% if n >= max_results_per_iter %} + + {% set paginated.result = snowflake__get_paginated_schemas_array( + max_iter, + max_results_per_iter, + max_total_results, + database, + watermark + ) + %} + + {% endif %} + + {%- set all_results_array = [result] + paginated.result -%} + {%- set result = result.merge(all_results_array) -%} + {%- do return(result) -%} + +{% endmacro %} + +{# macro snowflake__list_schemas(database) -#} {# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #} - {% set maximum = 10000 %} - {% set sql -%} + {# set maximum = 10000 #} + {# set sql -#} show terse schemas in database {{ database }} limit {{ maximum }} - {%- endset %} - {% set result = run_query(sql) %} - {% if (result | length) >= maximum %} - {% set msg %} + {#- endset %} + {# set result = run_query(sql) #} + {# if (result | length) >= maximum #} + {# set msg %} Too many schemas in database {{ database }}! dbt can only get information about databases with fewer than {{ maximum }} schemas. - {% endset %} - {% do exceptions.raise_compiler_error(msg) %} - {% endif %} + {# endset %} + {# do exceptions.raise_compiler_error(msg) #} + {# endif #} {{ return(result) }} -{% endmacro %} +{# endmacro #} {% macro snowflake__get_paginated_relations_array(max_iter, max_results_per_iter, max_total_results, schema_relation, watermark) %} diff --git a/tests/functional/adapter/test_list_schemas.py b/tests/functional/adapter/test_list_schemas.py new file mode 100644 index 000000000..a3a49a1a9 --- /dev/null +++ b/tests/functional/adapter/test_list_schemas.py @@ -0,0 +1,224 @@ +import pytest + +import json +from dbt.tests.util import run_dbt, run_dbt_and_capture + +# Testing rationale: +# - snowflake SHOW TERSE SCHEMAS command returns at max 10K objects in a single call +# - when dbt attempts to write into a database with more than 10K schemas, compilation will fail +# unless we paginate the result +# - however, testing this process is difficult at a full scale of 10K actual objects populated +# into a fresh testing schema +# - accordingly, we create a smaller set of views and test the looping iteration logic in +# smaller chunks + +NUM_SCHEMAS = 100 + +TABLE_BASE_SQL = """ +{{ config(materialized='table') }} + +select 1 as id +""".lstrip() + +MACROS__CREATE__TEST_SCHEMAS = """ +{% macro create_test_schemas(database, schemas) %} + + {% for schema in schemas %} + {% set sql %} + use database {{database}}; + create schema if not exists {{schema}}; + {% endset %} + + {% do run_query(sql) %} + {% endfor %} + +{% endmacro %} +""" + +MACROS__DROP__TEST_SCHEMAS = """ +{% macro drop_test_schemas(database, schemas) %} + + {% for schema in schemas %} + {% set sql %} + drop schema {{database}}.{{schema}}; + {% endset %} + + {% do run_query(sql) %} + {% endfor %} + +{% endmacro %} +""" + +MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS = """ +{% macro validate_list_schemas(database, max_iter=11, max_results_per_iter=10) %} + {% set schema_list_result = snowflake__list_schemas(database, max_iter=max_iter, max_results_per_iter=max_results_per_iter) %} + {% set n_schemas = schema_list_result | length %} + {{ log("n_schemas: " ~ n_schemas) }} +{% endmacro %} +""" + +MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR = """ +{% macro validate_list_schemas_raise_error(database) %} + {{ snowflake__list_schemas(database, max_iter=33, max_results_per_iter=3) }} +{% endmacro %} +""" + + +def parse_json_logs(json_log_output): + parsed_logs = [] + for line in json_log_output.split("\n"): + try: + log = json.loads(line) + except ValueError: + continue + + parsed_logs.append(log) + + return parsed_logs + + +def find_result_in_parsed_logs(parsed_logs, result_name): + return next( + ( + item["data"]["msg"] + for item in parsed_logs + if result_name in item["data"].get("msg", "msg") + ), + False, + ) + + +def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name): + return next( + ( + item["data"]["exc_info"] + for item in parsed_logs + if exc_info_name in item["data"].get("exc_info", "exc_info") + ), + False, + ) + + +class TestListSchemasSingle: + @pytest.fixture(scope="class") + def macros(self): + return { + "validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS, + "create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS, + "drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS, + } + + def test__snowflake__list_schemas_termination(self, project): + """ + validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching + macro when there are fewer than max_results_per_iter relations in the target schema + """ + + database = project.database + schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] + + create_kwargs = { + "database": database, + "schemas": schemas, + } + + run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) + + validate_kwargs = {"database": database, "max_iter": 1, "max_results_per_iter": 200} + _, log_output = run_dbt_and_capture( + [ + "--debug", + "--log-format=json", + "run-operation", + "validate_list_schemas", + "--args", + str(validate_kwargs), + ] + ) + + parsed_logs = parse_json_logs(log_output) + n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas") + + run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) + + assert ( + n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}" + ) # include information schema and base test schema in the count + + +class TestListRelationsWithoutCachingFull: + @pytest.fixture(scope="class") + def macros(self): + return { + "validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS, + "create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS, + "validate_list_schemas_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR, + "drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS, + } + + def test__snowflake__list_schemas(self, project): + """ + validates pagination logic in snowflake__list_schemas macro counts + the correct number of schemas in the target database when having to make multiple looped + calls of SHOW TERSE SCHEMAS. + """ + database = project.database + schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] + + create_kwargs = {"database": database, "schemas": schemas} + + run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) + + validate_kwargs = {"database": database} + _, log_output = run_dbt_and_capture( + [ + "--debug", + "--log-format=json", + "run-operation", + "validate_list_schemas", + "--args", + str(validate_kwargs), + ] + ) + + parsed_logs = parse_json_logs(log_output) + n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas") + + run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) + + assert ( + n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}" + ) # include information schema and base test schema in the count + + def test__snowflake__list_schemas_raise_error(self, project): + """ + validates pagination logic terminates and raises a compilation error + when exceeding the limit of how many results to return. + """ + run_dbt(["run"]) + + database = project.database + schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] + + create_kwargs = {"database": database, "schemas": schemas} + + run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) + + validate_kwargs = {"database": database} + _, log_output = run_dbt_and_capture( + [ + "--debug", + "--log-format=json", + "run-operation", + "validate_list_schemas_raise_error", + "--args", + str(validate_kwargs), + ], + expect_pass=False, + ) + + run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) + + parsed_logs = parse_json_logs(log_output) + traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback") + assert "dbt will list a maximum of 99 schemas in database" in traceback