Skip to content

Commit

Permalink
implements pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-winkler committed Oct 18, 2023
1 parent a1ae40d commit 4c72e52
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 11 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231018-145751.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Implements pagination on `list_schemas` process
time: 2023-10-18T14:57:51.629173-07:00
custom:
Author: matt-winkler
Issue: "810"
107 changes: 96 additions & 11 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,108 @@
{% do return(columns) %}
{% endmacro %}

{% macro snowflake__list_schemas(database) -%}
{% macro snowflake__get_paginated_schemas_array(max_iter, max_results_per_iter, max_total_results, database, watermark) %}

{% set paginated_schemas = [] %}

{% for _ in range(0, max_iter) %}
{%- set paginated_sql -%}
show terse schemas in database {{ database }} limit {{ max_results_per_iter }} from '{{ watermark.schema_name }}';
{%- endset -%}

{%- set paginated_result = run_query(paginated_sql) %}
{%- set paginated_n = (paginated_result | length) -%}

{#
terminating condition: if there are 0 records in the result we reached
the end exactly on the previous iteration
#}
{%- if paginated_n == 0 -%}
{%- break -%}
{%- endif -%}

{#
terminating condition: At some point the user needs to be reasonable with how
many schemas are contained in their databases. Since there was already
one iteration before attempting pagination, loop.index == max_iter means
the limit has been surpassed.
#}

{%- if loop.index == max_iter -%}
{%- set msg -%}
dbt will list a maximum of {{ max_total_results }} schemas in database {{ database }}.
Your database exceeds this limit. Please contact support@getdbt.com for troubleshooting tips,
or review and reduce the number of objects contained.
{%- endset -%}

{% do exceptions.raise_compiler_error(msg) %}
{%- endif -%}

{%- do paginated_schemas.append(paginated_result) -%}
{% set watermark.schema_name = paginated_result.columns[1].values()[-1] %}

{#
terminating condition: paginated_n < max_results_per_iter means we reached the end
#}
{%- if paginated_n < max_results_per_iter -%}
{%- break -%}
{%- endif -%}
{%- endfor -%}

{{ return(paginated_schemas) }}

{% endmacro %}

{% macro snowflake__list_schemas(database, max_iter=10, max_results_per_iter=1000) %}

{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
show terse schemas in database {{ database }} limit {{ max_results_per_iter }};
{%- endset -%}

{%- set result = run_query(sql) -%}

{%- set n = (result | length) -%}
{%- set watermark = namespace(schema_name=result.columns[1].values()[-1]) -%}
{%- set paginated = namespace(result=[]) -%}

{% if n >= max_results_per_iter %}

{% set paginated.result = snowflake__get_paginated_schemas_array(
max_iter,
max_results_per_iter,
max_total_results,
database,
watermark
)
%}

{% endif %}

{%- set all_results_array = [result] + paginated.result -%}
{%- set result = result.merge(all_results_array) -%}
{%- do return(result) -%}

{% endmacro %}

{# macro snowflake__list_schemas(database) -#}
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #}
{% set maximum = 10000 %}
{% set sql -%}
{# set maximum = 10000 #}
{# set sql -#}
show terse schemas in database {{ database }}
limit {{ maximum }}
{%- endset %}
{% set result = run_query(sql) %}
{% if (result | length) >= maximum %}
{% set msg %}
{#- endset %}
{# set result = run_query(sql) #}
{# if (result | length) >= maximum #}
{# set msg %}
Too many schemas in database {{ database }}! dbt can only get
information about databases with fewer than {{ maximum }} schemas.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{# endset %}
{# do exceptions.raise_compiler_error(msg) #}
{# endif #}
{{ return(result) }}
{% endmacro %}
{# endmacro #}


{% macro snowflake__get_paginated_relations_array(max_iter, max_results_per_iter, max_total_results, schema_relation, watermark) %}
Expand Down
224 changes: 224 additions & 0 deletions tests/functional/adapter/test_list_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import pytest

import json
from dbt.tests.util import run_dbt, run_dbt_and_capture

# Testing rationale:
# - snowflake SHOW TERSE SCHEMAS command returns at max 10K objects in a single call
# - when dbt attempts to write into a database with more than 10K schemas, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_SCHEMAS = 100

TABLE_BASE_SQL = """
{{ config(materialized='table') }}
select 1 as id
""".lstrip()

MACROS__CREATE__TEST_SCHEMAS = """
{% macro create_test_schemas(database, schemas) %}
{% for schema in schemas %}
{% set sql %}
use database {{database}};
create schema if not exists {{schema}};
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}
"""

MACROS__DROP__TEST_SCHEMAS = """
{% macro drop_test_schemas(database, schemas) %}
{% for schema in schemas %}
{% set sql %}
drop schema {{database}}.{{schema}};
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}
"""

MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS = """
{% macro validate_list_schemas(database, max_iter=11, max_results_per_iter=10) %}
{% set schema_list_result = snowflake__list_schemas(database, max_iter=max_iter, max_results_per_iter=max_results_per_iter) %}
{% set n_schemas = schema_list_result | length %}
{{ log("n_schemas: " ~ n_schemas) }}
{% endmacro %}
"""

MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR = """
{% macro validate_list_schemas_raise_error(database) %}
{{ snowflake__list_schemas(database, max_iter=33, max_results_per_iter=3) }}
{% endmacro %}
"""


def parse_json_logs(json_log_output):
parsed_logs = []
for line in json_log_output.split("\n"):
try:
log = json.loads(line)
except ValueError:
continue

parsed_logs.append(log)

return parsed_logs


def find_result_in_parsed_logs(parsed_logs, result_name):
return next(
(
item["data"]["msg"]
for item in parsed_logs
if result_name in item["data"].get("msg", "msg")
),
False,
)


def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
return next(
(
item["data"]["exc_info"]
for item in parsed_logs
if exc_info_name in item["data"].get("exc_info", "exc_info")
),
False,
)


class TestListSchemasSingle:
@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS,
"create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS,
"drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS,
}

def test__snowflake__list_schemas_termination(self, project):
"""
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""

database = project.database
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)]

create_kwargs = {
"database": database,
"schemas": schemas,
}

run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)])

validate_kwargs = {"database": database, "max_iter": 1, "max_results_per_iter": 200}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_schemas",
"--args",
str(validate_kwargs),
]
)

parsed_logs = parse_json_logs(log_output)
n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas")

run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)])

assert (
n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}"
) # include information schema and base test schema in the count


class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS,
"create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS,
"validate_list_schemas_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR,
"drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS,
}

def test__snowflake__list_schemas(self, project):
"""
validates pagination logic in snowflake__list_schemas macro counts
the correct number of schemas in the target database when having to make multiple looped
calls of SHOW TERSE SCHEMAS.
"""
database = project.database
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)]

create_kwargs = {"database": database, "schemas": schemas}

run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)])

validate_kwargs = {"database": database}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_schemas",
"--args",
str(validate_kwargs),
]
)

parsed_logs = parse_json_logs(log_output)
n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas")

run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)])

assert (
n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}"
) # include information schema and base test schema in the count

def test__snowflake__list_schemas_raise_error(self, project):
"""
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])

database = project.database
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)]

create_kwargs = {"database": database, "schemas": schemas}

run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)])

validate_kwargs = {"database": database}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_schemas_raise_error",
"--args",
str(validate_kwargs),
],
expect_pass=False,
)

run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)])

parsed_logs = parse_json_logs(log_output)
traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback")
assert "dbt will list a maximum of 99 schemas in database" in traceback

0 comments on commit 4c72e52

Please sign in to comment.