diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 8187fce78e5e47..b3eb23b25e0a37 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -5,6 +5,8 @@ from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST +SHOW_VIEWS_MAX_PAGE_SIZE = 10000 + def create_deny_regex_sql_filter( deny_pattern: List[str], filter_cols: List[str] @@ -202,48 +204,28 @@ def get_tags_on_columns_with_propagation( FROM table("{db_name}".information_schema.tag_references_all_columns('{quoted_table_identifier}', '{SnowflakeObjectDomain.TABLE}')); """ - # View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used. - # https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role @staticmethod - def views_for_database(db_name: Optional[str]) -> str: - db_clause = f'"{db_name}".' if db_name is not None else "" - return f""" - SELECT table_catalog AS "TABLE_CATALOG", - table_schema AS "TABLE_SCHEMA", - table_name AS "TABLE_NAME", - created AS "CREATED", - last_altered AS "LAST_ALTERED", - comment AS "COMMENT", - view_definition AS "VIEW_DEFINITION" - FROM {db_clause}information_schema.views t - WHERE table_schema != 'INFORMATION_SCHEMA' - order by table_schema, table_name""" - - # View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used. - # https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role - @staticmethod - def views_for_schema(schema_name: str, db_name: Optional[str]) -> str: - db_clause = f'"{db_name}".' if db_name is not None else "" - return f""" - SELECT table_catalog AS "TABLE_CATALOG", - table_schema AS "TABLE_SCHEMA", - table_name AS "TABLE_NAME", - created AS "CREATED", - last_altered AS "LAST_ALTERED", - comment AS "COMMENT", - view_definition AS "VIEW_DEFINITION" - FROM {db_clause}information_schema.views t - where table_schema='{schema_name}' - order by table_schema, table_name""" + def show_views_for_database( + db_name: str, + limit: int = SHOW_VIEWS_MAX_PAGE_SIZE, + view_pagination_marker: Optional[str] = None, + ) -> str: + # While there is an information_schema.views view, that only shows the view definition if the role + # is an owner of the view. That doesn't work for us. + # https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role - @staticmethod - def show_views_for_database(db_name: str) -> str: - return f"""show views in database "{db_name}";""" + # SHOW VIEWS can return a maximum of 10000 rows. + # https://docs.snowflake.com/en/sql-reference/sql/show-views#usage-notes + assert limit <= SHOW_VIEWS_MAX_PAGE_SIZE - @staticmethod - def show_views_for_schema(schema_name: str, db_name: Optional[str]) -> str: - db_clause = f'"{db_name}".' if db_name is not None else "" - return f"""show views in schema {db_clause}"{schema_name}";""" + # To work around this, we paginate through the results using the FROM clause. + from_clause = ( + f"""FROM '{view_pagination_marker}'""" if view_pagination_marker else "" + ) + return f"""\ +SHOW VIEWS IN DATABASE "{db_name}" +LIMIT {limit} {from_clause}; +""" @staticmethod def columns_for_schema(schema_name: str, db_name: Optional[str]) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index db2095da01134d..d84580a94ab4e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -117,7 +117,6 @@ class SnowflakeV2Report( # "Information schema query returned too much data. Please repeat query with more selective predicates."" # This will result in overall increase in time complexity num_get_tables_for_schema_queries: int = 0 - num_get_views_for_schema_queries: int = 0 num_get_columns_for_table_queries: int = 0 # these will be non-zero if the user choses to enable the extract_tags = "with_lineage" option, which requires diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 3e26d2acd78e1c..3254224e437a6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -9,7 +9,10 @@ from datahub.ingestion.api.report import SupportsAsObj from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain -from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery +from datahub.ingestion.source.snowflake.snowflake_query import ( + SHOW_VIEWS_MAX_PAGE_SIZE, + SnowflakeQuery, +) from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeQueryMixin from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView from datahub.utilities.serialized_lru_cache import serialized_lru_cache @@ -324,53 +327,54 @@ def get_tables_for_schema( return tables @serialized_lru_cache(maxsize=1) - def get_views_for_database( - self, db_name: str - ) -> Optional[Dict[str, List[SnowflakeView]]]: + def get_views_for_database(self, db_name: str) -> Dict[str, List[SnowflakeView]]: + page_limit = SHOW_VIEWS_MAX_PAGE_SIZE + views: Dict[str, List[SnowflakeView]] = {} - try: - cur = self.query(SnowflakeQuery.show_views_for_database(db_name)) - except Exception as e: - logger.debug( - f"Failed to get all views for database - {db_name}", exc_info=e - ) - # Error - Information schema query returned too much data. Please repeat query with more selective predicates. - return None - for table in cur: - if table["schema_name"] not in views: - views[table["schema_name"]] = [] - views[table["schema_name"]].append( - SnowflakeView( - name=table["name"], - created=table["created_on"], - # last_altered=table["last_altered"], - comment=table["comment"], - view_definition=table["text"], - last_altered=table["created_on"], - materialized=table.get("is_materialized", "false").lower() - == "true", + first_iteration = True + view_pagination_marker: Optional[str] = None + while first_iteration or view_pagination_marker is not None: + cur = self.query( + SnowflakeQuery.show_views_for_database( + db_name, + limit=page_limit, + view_pagination_marker=view_pagination_marker, ) ) - return views - def get_views_for_schema( - self, schema_name: str, db_name: str - ) -> List[SnowflakeView]: - views: List[SnowflakeView] = [] + first_iteration = False + view_pagination_marker = None + + result_set_size = 0 + for view in cur: + result_set_size += 1 + + view_name = view["name"] + schema_name = view["schema_name"] + if schema_name not in views: + views[schema_name] = [] + views[schema_name].append( + SnowflakeView( + name=view_name, + created=view["created_on"], + # last_altered=table["last_altered"], + comment=view["comment"], + view_definition=view["text"], + last_altered=view["created_on"], + materialized=( + view.get("is_materialized", "false").lower() == "true" + ), + ) + ) - cur = self.query(SnowflakeQuery.show_views_for_schema(schema_name, db_name)) - for table in cur: - views.append( - SnowflakeView( - name=table["name"], - created=table["created_on"], - # last_altered=table["last_altered"], - comment=table["comment"], - view_definition=table["text"], - last_altered=table["created_on"], + if result_set_size >= page_limit: + # If we hit the limit, we need to send another request to get the next page. + logger.info( + f"Fetching next page of views for {db_name} - after {view_name}" ) - ) + view_pagination_marker = view_name + return views @serialized_lru_cache(maxsize=SCHEMA_PARALLELISM) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 5a4e37078dd75f..920cf741770c39 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -1011,12 +1011,6 @@ def get_views_for_schema( ) -> List[SnowflakeView]: views = self.data_dictionary.get_views_for_database(db_name) - # get all views for database failed, - # falling back to get views for schema - if views is None: - self.report.num_get_views_for_schema_queries += 1 - return self.data_dictionary.get_views_for_schema(schema_name, db_name) - # Some schema may not have any table return views.get(schema_name, []) diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 30c4b2bec3a049..ea08a942674808 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -224,8 +224,6 @@ def default_query_results( # noqa: C901 ] elif query == SnowflakeQuery.tables_for_database("TEST_DB"): raise Exception("Information schema query returned too much data") - elif query == SnowflakeQuery.show_views_for_database("TEST_DB"): - raise Exception("Information schema query returned too much data") elif query == SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB"): return [ { @@ -241,7 +239,8 @@ def default_query_results( # noqa: C901 } for tbl_idx in range(1, num_tables + 1) ] - elif query == SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB"): + elif query == SnowflakeQuery.show_views_for_database("TEST_DB"): + # TODO: Add tests for view pagination. return [ { "schema_name": "TEST_SCHEMA", diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index 9760ea1a9c72ba..3a37382de65b46 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -144,7 +144,7 @@ def test_snowflake_no_tables_causes_pipeline_failure( ) sf_cursor.execute.side_effect = query_permission_response_override( no_tables_fn, - [SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB")], + [SnowflakeQuery.show_views_for_database("TEST_DB")], [], )