Skip to content

Commit

Permalink
feat(ingest/snowflake): support more than 10k views in a db (datahub-…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jun 18, 2024
1 parent 333799c commit 2d727a9
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST

SHOW_VIEWS_MAX_PAGE_SIZE = 10000


def create_deny_regex_sql_filter(
deny_pattern: List[str], filter_cols: List[str]
Expand Down Expand Up @@ -202,48 +204,28 @@ def get_tags_on_columns_with_propagation(
FROM table("{db_name}".information_schema.tag_references_all_columns('{quoted_table_identifier}', '{SnowflakeObjectDomain.TABLE}'));
"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
def views_for_database(db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT table_catalog AS "TABLE_CATALOG",
table_schema AS "TABLE_SCHEMA",
table_name AS "TABLE_NAME",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT",
view_definition AS "VIEW_DEFINITION"
FROM {db_clause}information_schema.views t
WHERE table_schema != 'INFORMATION_SCHEMA'
order by table_schema, table_name"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
def views_for_schema(schema_name: str, db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT table_catalog AS "TABLE_CATALOG",
table_schema AS "TABLE_SCHEMA",
table_name AS "TABLE_NAME",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT",
view_definition AS "VIEW_DEFINITION"
FROM {db_clause}information_schema.views t
where table_schema='{schema_name}'
order by table_schema, table_name"""
def show_views_for_database(
db_name: str,
limit: int = SHOW_VIEWS_MAX_PAGE_SIZE,
view_pagination_marker: Optional[str] = None,
) -> str:
# While there is an information_schema.views view, that only shows the view definition if the role
# is an owner of the view. That doesn't work for us.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role

@staticmethod
def show_views_for_database(db_name: str) -> str:
return f"""show views in database "{db_name}";"""
# SHOW VIEWS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-views#usage-notes
assert limit <= SHOW_VIEWS_MAX_PAGE_SIZE

@staticmethod
def show_views_for_schema(schema_name: str, db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""show views in schema {db_clause}"{schema_name}";"""
# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{view_pagination_marker}'""" if view_pagination_marker else ""
)
return f"""\
SHOW VIEWS IN DATABASE "{db_name}"
LIMIT {limit} {from_clause};
"""

@staticmethod
def columns_for_schema(schema_name: str, db_name: Optional[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class SnowflakeV2Report(
# "Information schema query returned too much data. Please repeat query with more selective predicates.""
# This will result in overall increase in time complexity
num_get_tables_for_schema_queries: int = 0
num_get_views_for_schema_queries: int = 0
num_get_columns_for_table_queries: int = 0

# these will be non-zero if the user choses to enable the extract_tags = "with_lineage" option, which requires
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_query import (
SHOW_VIEWS_MAX_PAGE_SIZE,
SnowflakeQuery,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeQueryMixin
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.serialized_lru_cache import serialized_lru_cache
Expand Down Expand Up @@ -324,53 +327,54 @@ def get_tables_for_schema(
return tables

@serialized_lru_cache(maxsize=1)
def get_views_for_database(
self, db_name: str
) -> Optional[Dict[str, List[SnowflakeView]]]:
def get_views_for_database(self, db_name: str) -> Dict[str, List[SnowflakeView]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE

views: Dict[str, List[SnowflakeView]] = {}
try:
cur = self.query(SnowflakeQuery.show_views_for_database(db_name))
except Exception as e:
logger.debug(
f"Failed to get all views for database - {db_name}", exc_info=e
)
# Error - Information schema query returned too much data. Please repeat query with more selective predicates.
return None

for table in cur:
if table["schema_name"] not in views:
views[table["schema_name"]] = []
views[table["schema_name"]].append(
SnowflakeView(
name=table["name"],
created=table["created_on"],
# last_altered=table["last_altered"],
comment=table["comment"],
view_definition=table["text"],
last_altered=table["created_on"],
materialized=table.get("is_materialized", "false").lower()
== "true",
first_iteration = True
view_pagination_marker: Optional[str] = None
while first_iteration or view_pagination_marker is not None:
cur = self.query(
SnowflakeQuery.show_views_for_database(
db_name,
limit=page_limit,
view_pagination_marker=view_pagination_marker,
)
)
return views

def get_views_for_schema(
self, schema_name: str, db_name: str
) -> List[SnowflakeView]:
views: List[SnowflakeView] = []
first_iteration = False
view_pagination_marker = None

result_set_size = 0
for view in cur:
result_set_size += 1

view_name = view["name"]
schema_name = view["schema_name"]
if schema_name not in views:
views[schema_name] = []
views[schema_name].append(
SnowflakeView(
name=view_name,
created=view["created_on"],
# last_altered=table["last_altered"],
comment=view["comment"],
view_definition=view["text"],
last_altered=view["created_on"],
materialized=(
view.get("is_materialized", "false").lower() == "true"
),
)
)

cur = self.query(SnowflakeQuery.show_views_for_schema(schema_name, db_name))
for table in cur:
views.append(
SnowflakeView(
name=table["name"],
created=table["created_on"],
# last_altered=table["last_altered"],
comment=table["comment"],
view_definition=table["text"],
last_altered=table["created_on"],
if result_set_size >= page_limit:
# If we hit the limit, we need to send another request to get the next page.
logger.info(
f"Fetching next page of views for {db_name} - after {view_name}"
)
)
view_pagination_marker = view_name

return views

@serialized_lru_cache(maxsize=SCHEMA_PARALLELISM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,12 +1011,6 @@ def get_views_for_schema(
) -> List[SnowflakeView]:
views = self.data_dictionary.get_views_for_database(db_name)

# get all views for database failed,
# falling back to get views for schema
if views is None:
self.report.num_get_views_for_schema_queries += 1
return self.data_dictionary.get_views_for_schema(schema_name, db_name)

# Some schema may not have any table
return views.get(schema_name, [])

Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ def default_query_results( # noqa: C901
]
elif query == SnowflakeQuery.tables_for_database("TEST_DB"):
raise Exception("Information schema query returned too much data")
elif query == SnowflakeQuery.show_views_for_database("TEST_DB"):
raise Exception("Information schema query returned too much data")
elif query == SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB"):
return [
{
Expand All @@ -241,7 +239,8 @@ def default_query_results( # noqa: C901
}
for tbl_idx in range(1, num_tables + 1)
]
elif query == SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB"):
elif query == SnowflakeQuery.show_views_for_database("TEST_DB"):
# TODO: Add tests for view pagination.
return [
{
"schema_name": "TEST_SCHEMA",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_snowflake_no_tables_causes_pipeline_failure(
)
sf_cursor.execute.side_effect = query_permission_response_override(
no_tables_fn,
[SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB")],
[SnowflakeQuery.show_views_for_database("TEST_DB")],
[],
)

Expand Down

0 comments on commit 2d727a9

Please sign in to comment.