From 48d711b19873794a423d2bd63034e959a23615d0 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 27 Nov 2024 16:49:01 +0530 Subject: [PATCH] fix(ingest): more error handling (#11969) --- .../datahub/ingestion/source/gc/datahub_gc.py | 27 +++++++++++++++---- .../source/gc/dataprocess_cleanup.py | 4 ++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index c4b4186f45fc3..52807ca2a3f02 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -144,15 +144,32 @@ def get_workunits_internal( self, ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: - self.revoke_expired_tokens() + try: + self.revoke_expired_tokens() + except Exception as e: + self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: - self.truncate_indices() + try: + self.truncate_indices() + except Exception as e: + self.report.failure("While trying to truncate indices ", exc=e) if self.dataprocess_cleanup: - yield from self.dataprocess_cleanup.get_workunits_internal() + try: + yield from self.dataprocess_cleanup.get_workunits_internal() + except Exception as e: + self.report.failure("While trying to cleanup data process ", exc=e) if self.soft_deleted_entities_cleanup: - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + try: + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + except Exception as e: + self.report.failure( + "While trying to cleanup soft deleted entities ", exc=e + ) if self.execution_request_cleanup: - self.execution_request_cleanup.run() + try: + self.execution_request_cleanup.run() + except Exception as e: + self.report.failure("While trying to cleanup execution request ", exc=e) yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 130f2c9c2e12f..0f35e1a67fede 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -404,7 +404,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: self.delete_dpi_from_datajobs(datajob_entity) except Exception as e: - logger.error(f"While trying to delete {datajob_entity} got {e}") + self.report.failure( + f"While trying to delete {datajob_entity} ", exc=e + ) if ( datajob_entity.total_runs == 0 and self.config.delete_empty_data_jobs