diff --git a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml index 3c1d4ef7572659..21734cd4e03fa4 100644 --- a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml +++ b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml @@ -1,6 +1,8 @@ source: type: datahub-gc config: + # Whether to run the recipe in dry-run mode or not + dry_run: false # Cleanup expired tokens cleanup_expired_tokens: true # Whether to truncate elasticsearch indices or not which can be safely truncated diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index cddc156165624b..1897f3f288ec0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -34,6 +34,11 @@ class DataHubGcSourceConfig(ConfigModel): + dry_run: bool = Field( + default=False, + description="Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup.", + ) + cleanup_expired_tokens: bool = Field( default=True, description="Whether to clean up expired tokens or not", @@ -95,11 +100,14 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig): if self.config.dataprocess_cleanup: self.dataprocess_cleanup = DataProcessCleanup( - ctx, self.config.dataprocess_cleanup, self.report + ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run ) if self.config.soft_deleted_entities_cleanup: self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup( - ctx, self.config.soft_deleted_entities_cleanup, self.report + ctx, + self.config.soft_deleted_entities_cleanup, + self.report, + self.config.dry_run, ) @classmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 989458e5b1eed1..80f7b7a9f4480c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -185,6 +185,7 @@ def __init__( ctx: PipelineContext, config: DataProcessCleanupConfig, report: DataProcessCleanupReport, + dry_run: bool = False, ): if not ctx.graph: raise ValueError("MetadataCleanupSource needs a datahub_api") @@ -193,6 +194,7 @@ def __init__( self.ctx = ctx self.config = config self.report = report + self.dry_run = dry_run def get_report(self) -> DataProcessCleanupReport: return self.report @@ -263,6 +265,12 @@ def delete_entity(self, urn: str, type: str) -> None: self.report.sample_removed_aspects_by_type[type] = LossyList() self.report.sample_removed_aspects_by_type[type].append(urn) + if self.dry_run: + logger.info( + f"Dry run is on otherwise it would have deleted {urn} with hard deletion is{self.config.hard_delete_entities}" + ) + return + self.ctx.graph.delete_entity(urn, self.config.hard_delete_entities) def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 6ae1dbd7b46b9e..4da23c13659a74 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -82,6 +82,7 @@ def __init__( ctx: PipelineContext, config: SoftDeletedEntitiesCleanupConfig, report: SoftDeletedEntitiesReport, + dry_run: bool = False, ): if not ctx.graph: raise ValueError(" Datahub API is required") @@ -90,6 +91,7 @@ def __init__( self.ctx = ctx self.config = config self.report = report + self.dry_run = dry_run def delete_entity(self, urn: str) -> None: assert self.ctx.graph @@ -113,12 +115,18 @@ def delete_entity(self, urn: str) -> None: entity_urn.entity_type ].append(urn) + if self.dry_run: + logger.info( + f"Dry run is on otherwise it would have deleted {urn} with hard deletion" + ) + return + self.ctx.graph.delete_entity(urn=urn, hard=True) def delete_soft_deleted_entity(self, urn: str) -> None: assert self.ctx.graph - if not self.config.retention_days: + if self.config.retention_days is None: logger.info("Retention days is not set, skipping soft delete cleanup") return