From 70bec480888e71ed4bbfbc62cc56fab4cc46abe8 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Sat, 7 Dec 2024 13:45:14 +0100 Subject: [PATCH] fix(ingest/gc): Additional dataprocess cleanup fixes (#12049) --- .../source/gc/dataprocess_cleanup.py | 31 +++++++----- metadata-ingestion/tests/unit/test_gc.py | 48 +++++++++++++++++++ 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 3e51b7da9e8be..8aacf13cdb00f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -207,6 +207,9 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]: assert self.ctx.graph dpis = [] start = 0 + # This graphql endpoint doesn't support scrolling and therefore after 10k DPIs it causes performance issues on ES + # Therefore, we are limiting the max DPIs to 9000 + max_item = 9000 while True: try: job_query_result = self.ctx.graph.execute_graphql( @@ -226,10 +229,12 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]: runs = runs_data.get("runs") dpis.extend(runs) start += batch_size - if len(runs) < batch_size: + if len(runs) < batch_size or start >= max_item: break except Exception as e: - logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}") + self.report.failure( + f"Exception while fetching DPIs for job {job_urn}:", exc=e + ) break return dpis @@ -254,8 +259,9 @@ def keep_last_n_dpi( deleted_count_last_n += 1 futures[future]["deleted"] = True except Exception as e: - logger.error(f"Exception while deleting DPI: {e}") - + self.report.report_failure( + f"Exception while deleting DPI: {e}", exc=e + ) if deleted_count_last_n % self.config.batch_size == 0: logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}") if self.config.delay: @@ -289,7 +295,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: dpis = self.fetch_dpis(job.urn, self.config.batch_size) dpis.sort( key=lambda x: x["created"]["time"] - if "created" in x and "time" in x["created"] + if x.get("created") and x["created"].get("time") else 0, reverse=True, ) @@ -325,8 +331,8 @@ def remove_old_dpis( continue if ( - "created" not in dpi - or "time" not in dpi["created"] + not dpi.get("created") + or not dpi["created"].get("time") or dpi["created"]["time"] < retention_time * 1000 ): future = executor.submit( @@ -340,7 +346,7 @@ def remove_old_dpis( deleted_count_retention += 1 futures[future]["deleted"] = True except Exception as e: - logger.error(f"Exception while deleting DPI: {e}") + self.report.report_failure(f"Exception while deleting DPI: {e}", exc=e) if deleted_count_retention % self.config.batch_size == 0: logger.info( @@ -351,9 +357,12 @@ def remove_old_dpis( logger.info(f"Sleeping for {self.config.delay} seconds") time.sleep(self.config.delay) - logger.info( - f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention" - ) + if deleted_count_retention > 0: + logger.info( + f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention" + ) + else: + logger.debug(f"No DPIs to delete from {job.urn} due to retention") def get_data_flows(self) -> Iterable[DataFlowEntity]: assert self.ctx.graph diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py index 5429c85dd608d..8f00d5e064db8 100644 --- a/metadata-ingestion/tests/unit/test_gc.py +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -84,6 +84,54 @@ def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(10, self.report.num_aspects_removed) + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpi_null_created_time( + self, mock_fetch_dpis + ): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) + ] + [ + { + "urn": "urn:li:dataprocessInstance:11", + "created": {"time": None}, + } + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(11, self.report.num_aspects_removed) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) + ] + [ + { + "urn": "urn:li:dataprocessInstance:11", + "created": None, + } + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(11, self.report.num_aspects_removed) + def test_fetch_dpis(self): assert self.cleanup.ctx.graph self.cleanup.ctx.graph = MagicMock()