Skip to content

Commit

Permalink
feat(ingest/gc): Add dataflow and soft deleted entities cleanup (data…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Sep 18, 2024
1 parent ddcf2dc commit e7a3890
Show file tree
Hide file tree
Showing 4 changed files with 683 additions and 3 deletions.
22 changes: 22 additions & 0 deletions metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
source:
type: datahub-gc
config:
# Cleanup expired tokens
cleanup_expired_tokens: true
# Whether to truncate elasticsearch indices or not which can be safely truncated
truncate_indices: true

# Cleanup DataProcess Instances
dataprocess_cleanup:
retention_days: 10
# Delete empty Data Jobs (if no DataProcessInstance associated with the DataJob)
delete_empty_data_jobs: true
# Delete empty Data Flow (if no DataJob associated with the DataFlow)
delete_empty_data_flows: true
# Whether to hard delete entities or soft delete them
hard_delete_entities: false
# Keep the last n dataprocess instances
keep_last_n: 5
soft_deleted_entities_cleanup:
# Delete soft deleted entities which were deleted 10 days ago
retention_days: 10
57 changes: 54 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import re
import time
from dataclasses import dataclass
from typing import Dict, Iterable
from functools import partial
from typing import Dict, Iterable, List, Optional

from pydantic import Field

Expand All @@ -15,8 +16,19 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.gc.dataprocess_cleanup import (
DataProcessCleanup,
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)

logger = logging.getLogger(__name__)

Expand All @@ -43,34 +55,73 @@ class DataHubGcSourceConfig(ConfigModel):
description="Sleep between truncation monitoring.",
)

dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field(
default=None,
description="Configuration for data process cleanup",
)

soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field(
default=None,
description="Configuration for soft deleted entities cleanup",
)


@dataclass
class DataHubGcSourceReport(SourceReport):
class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
expired_tokens_revoked: int = 0


@platform_name("DataHubGc")
@config_class(DataHubGcSourceConfig)
@support_status(SupportStatus.TESTING)
class DataHubGcSource(Source):
"""
DataHubGcSource is responsible for performing garbage collection tasks on DataHub.
This source performs the following tasks:
1. Cleans up expired tokens.
2. Truncates Elasticsearch indices based on configuration.
3. Cleans up data processes and soft-deleted entities if configured.
"""

def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.ctx = ctx
self.config = config
self.report = DataHubGcSourceReport()
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
ctx, self.config.dataprocess_cleanup, self.report
)
if self.config.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
ctx, self.config.soft_deleted_entities_cleanup, self.report
)

@classmethod
def create(cls, config_dict, ctx):
config = DataHubGcSourceConfig.parse_obj(config_dict)
return cls(ctx, config)

# auto_work_unit_report is overriden to disable a couple of automation like auto status aspect, etc. which is not needed her.
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [partial(auto_workunit_reporter, self.get_report())]

def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
if self.config.truncate_indices:
self.truncate_indices()
if self.dataprocess_cleanup:
yield from self.dataprocess_cleanup.get_workunits_internal()
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
yield from []

def truncate_indices(self) -> None:
Expand Down
Loading

0 comments on commit e7a3890

Please sign in to comment.