Skip to content

Commit

Permalink
fix(ingest/gc): add limit, add actual loop for iterating over batches (
Browse files Browse the repository at this point in the history
…datahub-project#11809)

Co-authored-by: treff7es <[email protected]>
  • Loading branch information
anshbansal and treff7es authored Nov 6, 2024
1 parent ac94274 commit 32878ab
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):
default=None,
description="Query to filter entities",
)
limit_entities_delete: Optional[int] = Field(
10000, description="Max number of entities to delete."
)

runtime_limit_seconds: Optional[int] = Field(
None,
description="Runtime limit in seconds",
)


@dataclass
Expand Down Expand Up @@ -122,6 +130,10 @@ def delete_entity(self, urn: str) -> None:
return

self.ctx.graph.delete_entity(urn=urn, hard=True)
self.ctx.graph.delete_references_to_urn(
urn=urn,
dry_run=False,
)

def delete_soft_deleted_entity(self, urn: str) -> None:
assert self.ctx.graph
Expand All @@ -145,6 +157,7 @@ def delete_soft_deleted_entity(self, urn: str) -> None:

def cleanup_soft_deleted_entities(self) -> None:
assert self.ctx.graph
start_time = time.time()

deleted_count_retention = 0
urns = self.ctx.graph.get_urns_by_filter(
Expand All @@ -158,7 +171,26 @@ def cleanup_soft_deleted_entities(self) -> None:

futures = {}
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
num_urns_submitted = 0
for urn in urns:
num_urns_submitted += 1
if (
self.config.limit_entities_delete
and num_urns_submitted > self.config.limit_entities_delete
):
logger.info(
f"Limit of {self.config.limit_entities_delete} entities reached. Stopping"
)
break
if (
self.config.runtime_limit_seconds
and time.time() - start_time > self.config.runtime_limit_seconds
):
logger.info(
f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Stopping"
)
break

future = executor.submit(self.delete_soft_deleted_entity, urn)
futures[future] = urn

Expand Down

0 comments on commit 32878ab

Please sign in to comment.