From 8ca8fd98ce874691fcd3d51d3f30509656ad7210 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:12:03 +0530 Subject: [PATCH] fix(ingest/sql-parsing): ignore processed query_id from temp upstream (#11798) Co-authored-by: Harshal Sheth --- .../source/bigquery_v2/queries_extractor.py | 15 ++++++++++----- .../datahub/sql_parsing/sql_parsing_aggregator.py | 5 ++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index afaaaf51964f8e..497947abe4ef9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -276,18 +276,23 @@ def get_workunits_internal( logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer, queries_deduped: - i = 0 - for _, query_instances in queries_deduped.items(): + last_log_time = datetime.now() + last_report_time = datetime.now() + for i, (_, query_instances) in enumerate(queries_deduped.items()): for query in query_instances.values(): - if i > 0 and i % 10000 == 0: + now = datetime.now() + if (now - last_log_time).total_seconds() >= 60: logger.info( - f"Added {i} query log equeries_dedupedntries to SQL aggregator" + f"Added {i} deduplicated query log entries to SQL aggregator" ) + last_log_time = now + + if (now - last_report_time).total_seconds() >= 300: if self.report.sql_aggregator: logger.info(self.report.sql_aggregator.as_string()) + last_report_time = now self.aggregator.add(query) - i += 1 yield from auto_workunit(self.aggregator.gen_metadata()) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f0496379c45b8b..e8a0369597d53a 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -1571,7 +1571,10 @@ def _recurse_into_query( if upstream_query_ids: for upstream_query_id in upstream_query_ids: upstream_query = self._query_map.get(upstream_query_id) - if upstream_query: + if ( + upstream_query + and upstream_query.query_id not in composed_of_queries + ): temp_query_lineage_info = _recurse_into_query( upstream_query, recursion_path )