From 326afc6308e46e6ed635bad9cdcf0a97aaff2d3d Mon Sep 17 00:00:00 2001 From: Julien Jehannet <80408664+aviv-julienjehannet@users.noreply.github.com> Date: Wed, 23 Oct 2024 08:09:23 +0200 Subject: [PATCH 1/2] fix(ingestion/glue): manage table names from resource_links from nearest catalog correctly (#11578) --- .../src/datahub/ingestion/source/aws/glue.py | 55 ++++++++++--------- .../tests/unit/glue/test_glue_source.py | 6 +- .../tests/unit/glue/test_glue_source_stubs.py | 8 +-- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 3b9b5dbf63e18..37c146218e263 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -678,12 +678,19 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]: else: paginator_response = paginator.paginate() - for page in paginator_response: - yield from page["DatabaseList"] + pattern = "DatabaseList" + if self.source_config.ignore_resource_links: + # exclude resource links by using a JMESPath conditional query against the TargetDatabase struct key + pattern += "[?!TargetDatabase]" + + for database in paginator_response.search(pattern): + if self.source_config.database_pattern.allowed(database["Name"]): + yield database - def get_tables_from_database(self, database_name: str) -> Iterable[Dict]: + def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]: # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html paginator = self.glue_client.get_paginator("get_tables") + database_name = database["Name"] if self.source_config.catalog_id: paginator_response = paginator.paginate( @@ -692,34 +699,28 @@ def get_tables_from_database(self, database_name: str) -> Iterable[Dict]: else: paginator_response = paginator.paginate(DatabaseName=database_name) - for page in paginator_response: - yield from page["TableList"] + for table in paginator_response.search("TableList"): + # if resource links are detected, re-use database names from the current catalog + # otherwise, external names are picked up instead of aliased ones when creating full table names later + # This will cause an incoherent situation when creating full table names later + # Note: use an explicit source_config check but it is useless actually (filtering has already been done) + if ( + not self.source_config.ignore_resource_links + and "TargetDatabase" in database + ): + table["DatabaseName"] = database["Name"] + yield table def get_all_databases_and_tables( self, - ) -> Tuple[Dict, List[Dict]]: - all_databases = self.get_all_databases() - - if self.source_config.ignore_resource_links: - all_databases = [ - database - for database in all_databases - if "TargetDatabase" not in database - ] - - allowed_databases = { - database["Name"]: database - for database in all_databases - if self.source_config.database_pattern.allowed(database["Name"]) - } - + ) -> Tuple[List[Mapping[str, Any]], List[Dict]]: + all_databases = [*self.get_all_databases()] all_tables = [ - table - for database_name in allowed_databases - for table in self.get_tables_from_database(database_name) + tables + for database in all_databases + for tables in self.get_tables_from_database(database) ] - - return allowed_databases, all_tables + return all_databases, all_tables def get_lineage_if_enabled( self, mce: MetadataChangeEventClass @@ -1039,7 +1040,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: databases, tables = self.get_all_databases_and_tables() - for database in databases.values(): + for database in databases: yield from self.gen_database_containers(database) for table in tables: diff --git a/metadata-ingestion/tests/unit/glue/test_glue_source.py b/metadata-ingestion/tests/unit/glue/test_glue_source.py index 57f48db1129c4..4df0c6d17b06c 100644 --- a/metadata-ingestion/tests/unit/glue/test_glue_source.py +++ b/metadata-ingestion/tests/unit/glue/test_glue_source.py @@ -267,8 +267,8 @@ def test_platform_config(): @pytest.mark.parametrize( "ignore_resource_links, all_databases_and_tables_result", [ - (True, ({}, [])), - (False, ({"test-database": resource_link_database}, target_database_tables)), + (True, ([], [])), + (False, ([resource_link_database], target_database_tables)), ], ) def test_ignore_resource_links(ignore_resource_links, all_databases_and_tables_result): @@ -289,7 +289,7 @@ def test_ignore_resource_links(ignore_resource_links, all_databases_and_tables_r glue_stubber.add_response( "get_tables", get_tables_response_for_target_database, - {"DatabaseName": "test-database"}, + {"DatabaseName": "resource-link-test-database"}, ) assert source.get_all_databases_and_tables() == all_databases_and_tables_result diff --git a/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py b/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py index 46ab65234c22d..dba1eea3010c2 100644 --- a/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py +++ b/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py @@ -5,7 +5,7 @@ from botocore.response import StreamingBody resource_link_database = { - "Name": "test-database", + "Name": "resource-link-test-database", "CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19), "CreateTableDefaultPermissions": [], "TargetDatabase": {"CatalogId": "432143214321", "DatabaseName": "test-database"}, @@ -92,10 +92,8 @@ }, ] } -databases_1 = { - "flights-database": {"Name": "flights-database", "CatalogId": "123412341234"} -} -databases_2 = {"test-database": {"Name": "test-database", "CatalogId": "123412341234"}} +databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}] +databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}] tables_1 = [ { "Name": "avro", From e96323a2a2ff91bef33dd2d13e625320e656a575 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 23 Oct 2024 01:19:49 -0700 Subject: [PATCH 2/2] feat(ingest/fivetran): show connector filter reason (#11695) --- .../docs/sources/fivetran/fivetran_recipe.yml | 4 +- .../ingestion/source/fivetran/config.py | 3 +- .../source/fivetran/fivetran_log_api.py | 37 ++++++++++++------- .../source/fivetran/fivetran_query.py | 12 +++++- .../integration/fivetran/test_fivetran.py | 8 +++- 5 files changed, 43 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml index 3495b89a7e193..af4d5c5792f41 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml @@ -25,7 +25,7 @@ source: client_email: "client_email" client_id: "client_id" private_key: "private_key" - dataset: "fivetran_log_dataset" + dataset: "fivetran_log_dataset" # Optional - filter for certain connector names instead of ingesting everything. # connector_patterns: @@ -35,7 +35,7 @@ source: # Optional -- A mapping of the connector's all sources to its database. # sources_to_database: # connector_id: source_db - + # Optional -- This mapping is optional and only required to configure platform-instance for source # A mapping of Fivetran connector id to data platform instance # sources_to_platform_instance: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index b60a6b96c74e9..394015500d1c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -160,8 +160,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin ) connector_patterns: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Filtering regex patterns for connector ids. " - "They're visible in the Fivetran UI under Connectors -> Setup -> Fivetran Connector ID.", + description="Filtering regex patterns for connector names.", ) destination_patterns: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 5c92e0197abe9..79f9d513bfb7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -84,17 +84,21 @@ def _query(self, query: str) -> List[Dict]: query = sqlglot.parse_one(query, dialect="snowflake").sql( dialect=self.fivetran_log_config.destination_platform, pretty=True ) - logger.debug(f"Query : {query}") + logger.info(f"Executing query: {query}") resp = self.engine.execute(query) return [row for row in resp] - def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]: + def _get_column_lineage_metadata( + self, connector_ids: List[str] + ) -> Dict[Tuple[str, str], List]: """ Returns dict of column lineage metadata with key as (, ) """ all_column_lineage = defaultdict(list) column_lineage_result = self._query( - self.fivetran_log_query.get_column_lineage_query() + self.fivetran_log_query.get_column_lineage_query( + connector_ids=connector_ids + ) ) for column_lineage in column_lineage_result: key = ( @@ -104,13 +108,13 @@ def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]: all_column_lineage[key].append(column_lineage) return dict(all_column_lineage) - def _get_table_lineage_metadata(self) -> Dict[str, List]: + def _get_table_lineage_metadata(self, connector_ids: List[str]) -> Dict[str, List]: """ Returns dict of table lineage metadata with key as 'CONNECTOR_ID' """ connectors_table_lineage_metadata = defaultdict(list) table_lineage_result = self._query( - self.fivetran_log_query.get_table_lineage_query() + self.fivetran_log_query.get_table_lineage_query(connector_ids=connector_ids) ) for table_lineage in table_lineage_result: connectors_table_lineage_metadata[ @@ -224,8 +228,9 @@ def get_user_email(self, user_id: str) -> Optional[str]: return self._get_users().get(user_id) def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: - table_lineage_metadata = self._get_table_lineage_metadata() - column_lineage_metadata = self._get_column_lineage_metadata() + connector_ids = [connector.connector_id for connector in connectors] + table_lineage_metadata = self._get_table_lineage_metadata(connector_ids) + column_lineage_metadata = self._get_column_lineage_metadata(connector_ids) for connector in connectors: connector.lineage = self._extract_connector_lineage( table_lineage_result=table_lineage_metadata.get(connector.connector_id), @@ -254,20 +259,25 @@ def get_allowed_connectors_list( logger.info("Fetching connector list") connector_list = self._query(self.fivetran_log_query.get_connectors_query()) for connector in connector_list: - if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): - report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + connector_name = connector[Constant.CONNECTOR_NAME] + if not connector_patterns.allowed(connector_name): + report.report_connectors_dropped(connector_name) continue - if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]): - report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + if not destination_patterns.allowed( + destination_id := connector[Constant.DESTINATION_ID] + ): + report.report_connectors_dropped( + f"{connector_name} (destination_id: {destination_id})" + ) continue connectors.append( Connector( connector_id=connector[Constant.CONNECTOR_ID], - connector_name=connector[Constant.CONNECTOR_NAME], + connector_name=connector_name, connector_type=connector[Constant.CONNECTOR_TYPE_ID], paused=connector[Constant.PAUSED], sync_frequency=connector[Constant.SYNC_FREQUENCY], - destination_id=connector[Constant.DESTINATION_ID], + destination_id=destination_id, user_id=connector[Constant.CONNECTING_USER_ID], lineage=[], # filled later jobs=[], # filled later @@ -279,6 +289,7 @@ def get_allowed_connectors_list( # we push down connector id filters. logger.info("No allowed connectors found") return [] + logger.info(f"Found {len(connectors)} allowed connectors") with report.metadata_extraction_perf.connectors_lineage_extraction_sec: logger.info("Fetching connector lineage") diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index c9e329b706768..34dd252ec72b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -80,7 +80,10 @@ def get_sync_logs_query( ORDER BY connector_id, end_time DESC """ - def get_table_lineage_query(self) -> str: + def get_table_lineage_query(self, connector_ids: List[str]) -> str: + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + return f"""\ SELECT stm.connector_id as connector_id, @@ -95,11 +98,15 @@ def get_table_lineage_query(self) -> str: JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id +WHERE stm.connector_id IN ({formatted_connector_ids}) QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} ORDER BY stm.connector_id, tl.created_at DESC """ - def get_column_lineage_query(self) -> str: + def get_column_lineage_query(self, connector_ids: List[str]) -> str: + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + return f"""\ SELECT scm.table_id as source_table_id, @@ -114,6 +121,7 @@ def get_column_lineage_query(self) -> str: -- Only joining source_table_metadata to get the connector_id. JOIN {self.db_clause}source_table_metadata as stm ON scm.table_id = stm.id +WHERE stm.connector_id IN ({formatted_connector_ids}) QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} ORDER BY stm.connector_id, cl.created_at DESC """ diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index e72162b12e48f..f49f499fe43b4 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -43,7 +43,9 @@ def default_query_results( return [] elif query == fivetran_log_query.get_connectors_query(): return connector_query_results - elif query == fivetran_log_query.get_table_lineage_query(): + elif query == fivetran_log_query.get_table_lineage_query( + connector_ids=["calendar_elected"] + ): return [ { "connector_id": "calendar_elected", @@ -64,7 +66,9 @@ def default_query_results( "destination_schema_name": "postgres_public", }, ] - elif query == fivetran_log_query.get_column_lineage_query(): + elif query == fivetran_log_query.get_column_lineage_query( + connector_ids=["calendar_elected"] + ): return [ { "source_table_id": "10040",