Skip to content

Commit

Permalink
feat(ingest/gc): Add dry run mode to gc recipe (datahub-project#11413)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Sep 18, 2024
1 parent 31edb46 commit dea2ef0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
source:
type: datahub-gc
config:
# Whether to run the recipe in dry-run mode or not
dry_run: false
# Cleanup expired tokens
cleanup_expired_tokens: true
# Whether to truncate elasticsearch indices or not which can be safely truncated
Expand Down
12 changes: 10 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@


class DataHubGcSourceConfig(ConfigModel):
dry_run: bool = Field(
default=False,
description="Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup.",
)

cleanup_expired_tokens: bool = Field(
default=True,
description="Whether to clean up expired tokens or not",
Expand Down Expand Up @@ -95,11 +100,14 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
ctx, self.config.dataprocess_cleanup, self.report
ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run
)
if self.config.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
ctx, self.config.soft_deleted_entities_cleanup, self.report
ctx,
self.config.soft_deleted_entities_cleanup,
self.report,
self.config.dry_run,
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(
ctx: PipelineContext,
config: DataProcessCleanupConfig,
report: DataProcessCleanupReport,
dry_run: bool = False,
):
if not ctx.graph:
raise ValueError("MetadataCleanupSource needs a datahub_api")
Expand All @@ -193,6 +194,7 @@ def __init__(
self.ctx = ctx
self.config = config
self.report = report
self.dry_run = dry_run

def get_report(self) -> DataProcessCleanupReport:
return self.report
Expand Down Expand Up @@ -263,6 +265,12 @@ def delete_entity(self, urn: str, type: str) -> None:
self.report.sample_removed_aspects_by_type[type] = LossyList()
self.report.sample_removed_aspects_by_type[type].append(urn)

if self.dry_run:
logger.info(
f"Dry run is on otherwise it would have deleted {urn} with hard deletion is{self.config.hard_delete_entities}"
)
return

self.ctx.graph.delete_entity(urn, self.config.hard_delete_entities)

def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
ctx: PipelineContext,
config: SoftDeletedEntitiesCleanupConfig,
report: SoftDeletedEntitiesReport,
dry_run: bool = False,
):
if not ctx.graph:
raise ValueError(" Datahub API is required")
Expand All @@ -90,6 +91,7 @@ def __init__(
self.ctx = ctx
self.config = config
self.report = report
self.dry_run = dry_run

def delete_entity(self, urn: str) -> None:
assert self.ctx.graph
Expand All @@ -113,12 +115,18 @@ def delete_entity(self, urn: str) -> None:
entity_urn.entity_type
].append(urn)

if self.dry_run:
logger.info(
f"Dry run is on otherwise it would have deleted {urn} with hard deletion"
)
return

self.ctx.graph.delete_entity(urn=urn, hard=True)

def delete_soft_deleted_entity(self, urn: str) -> None:
assert self.ctx.graph

if not self.config.retention_days:
if self.config.retention_days is None:
logger.info("Retention days is not set, skipping soft delete cleanup")
return

Expand Down

0 comments on commit dea2ef0

Please sign in to comment.