From 756b199506d57449c60a3a28901f7d22fe89f9f1 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 24 Dec 2024 14:56:35 -0800 Subject: [PATCH 1/2] fix(ingest/glue): Add additional checks and logging when specifying catalog_id (#12168) --- .../src/datahub/ingestion/source/aws/glue.py | 14 +++++- .../tests/unit/glue/glue_mces_golden.json | 2 +- .../glue/glue_mces_golden_table_lineage.json | 2 +- .../glue_mces_platform_instance_golden.json | 2 +- .../tests/unit/glue/test_glue_source.py | 43 +++++++++++++++++-- .../tests/unit/glue/test_glue_source_stubs.py | 8 ++-- 6 files changed, 59 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 37c146218e2633..7a5ed154d40bc7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -52,6 +52,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.report import EntityFilterReport from datahub.ingestion.api.source import MetadataWorkUnitProcessor from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws import s3_util @@ -115,7 +116,6 @@ logger = logging.getLogger(__name__) - DEFAULT_PLATFORM = "glue" VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"] @@ -220,6 +220,7 @@ def platform_validator(cls, v: str) -> str: class GlueSourceReport(StaleEntityRemovalSourceReport): tables_scanned = 0 filtered: List[str] = dataclass_field(default_factory=list) + databases: EntityFilterReport = EntityFilterReport.field(type="database") num_job_script_location_missing: int = 0 num_job_script_location_invalid: int = 0 @@ -668,6 +669,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce) def get_all_databases(self) -> Iterable[Mapping[str, Any]]: + logger.debug("Getting all databases") # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html paginator = self.glue_client.get_paginator("get_databases") @@ -684,10 +686,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]: pattern += "[?!TargetDatabase]" for database in paginator_response.search(pattern): - if self.source_config.database_pattern.allowed(database["Name"]): + if (not self.source_config.database_pattern.allowed(database["Name"])) or ( + self.source_config.catalog_id + and database.get("CatalogId") + and database.get("CatalogId") != self.source_config.catalog_id + ): + self.report.databases.dropped(database["Name"]) + else: + self.report.databases.processed(database["Name"]) yield database def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]: + logger.debug(f"Getting tables from database {database['Name']}") # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html paginator = self.glue_client.get_paginator("get_tables") database_name = database["Name"] diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_golden.json b/metadata-ingestion/tests/unit/glue/glue_mces_golden.json index 87971de12fbb39..71d7c31b222bde 100644 --- a/metadata-ingestion/tests/unit/glue/glue_mces_golden.json +++ b/metadata-ingestion/tests/unit/glue/glue_mces_golden.json @@ -124,7 +124,7 @@ "CreateTime": "June 01, 2021 at 14:55:13" }, "name": "empty-database", - "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database", + "qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database", "env": "PROD" } } diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json index e2dd4cec97c2ec..22bb4b53b91efd 100644 --- a/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json +++ b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json @@ -124,7 +124,7 @@ "CreateTime": "June 01, 2021 at 14:55:13" }, "name": "empty-database", - "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database", + "qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database", "env": "PROD" } } diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_platform_instance_golden.json b/metadata-ingestion/tests/unit/glue/glue_mces_platform_instance_golden.json index 0b883062763f41..b700335c26e5aa 100644 --- a/metadata-ingestion/tests/unit/glue/glue_mces_platform_instance_golden.json +++ b/metadata-ingestion/tests/unit/glue/glue_mces_platform_instance_golden.json @@ -129,7 +129,7 @@ "CreateTime": "June 01, 2021 at 14:55:13" }, "name": "empty-database", - "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database", + "qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database", "env": "PROD" } } diff --git a/metadata-ingestion/tests/unit/glue/test_glue_source.py b/metadata-ingestion/tests/unit/glue/test_glue_source.py index 693fd6bc336fd3..9e3f260a23f1c8 100644 --- a/metadata-ingestion/tests/unit/glue/test_glue_source.py +++ b/metadata-ingestion/tests/unit/glue/test_glue_source.py @@ -35,8 +35,8 @@ validate_all_providers_have_committed_successfully, ) from tests.unit.glue.test_glue_source_stubs import ( - databases_1, - databases_2, + empty_database, + flights_database, get_bucket_tagging, get_databases_delta_response, get_databases_response, @@ -64,6 +64,7 @@ tables_2, tables_profiling_1, target_database_tables, + test_database, ) FROZEN_TIME = "2020-04-14 07:00:00" @@ -310,6 +311,40 @@ def test_config_without_platform(): assert source.platform == "glue" +def test_get_databases_filters_by_catalog(): + def format_databases(databases): + return set(d["Name"] for d in databases) + + all_catalogs_source: GlueSource = GlueSource( + config=GlueSourceConfig(aws_region="us-west-2"), + ctx=PipelineContext(run_id="glue-source-test"), + ) + with Stubber(all_catalogs_source.glue_client) as glue_stubber: + glue_stubber.add_response("get_databases", get_databases_response, {}) + + expected = [flights_database, test_database, empty_database] + actual = all_catalogs_source.get_all_databases() + assert format_databases(actual) == format_databases(expected) + assert all_catalogs_source.report.databases.dropped_entities.as_obj() == [] + + catalog_id = "123412341234" + single_catalog_source: GlueSource = GlueSource( + config=GlueSourceConfig(catalog_id=catalog_id, aws_region="us-west-2"), + ctx=PipelineContext(run_id="glue-source-test"), + ) + with Stubber(single_catalog_source.glue_client) as glue_stubber: + glue_stubber.add_response( + "get_databases", get_databases_response, {"CatalogId": catalog_id} + ) + + expected = [flights_database, test_database] + actual = single_catalog_source.get_all_databases() + assert format_databases(actual) == format_databases(expected) + assert single_catalog_source.report.databases.dropped_entities.as_obj() == [ + "empty-database" + ] + + @freeze_time(FROZEN_TIME) def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format( @@ -357,8 +392,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): tables_on_first_call = tables_1 tables_on_second_call = tables_2 mock_get_all_databases_and_tables.side_effect = [ - (databases_1, tables_on_first_call), - (databases_2, tables_on_second_call), + ([flights_database], tables_on_first_call), + ([test_database], tables_on_second_call), ] pipeline_run1 = run_and_get_pipeline(pipeline_config_dict) diff --git a/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py b/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py index dba1eea3010c2f..43bf62fd4e3b8a 100644 --- a/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py +++ b/metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py @@ -88,12 +88,14 @@ "Permissions": ["ALL"], } ], - "CatalogId": "123412341234", + "CatalogId": "000000000000", }, ] } -databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}] -databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}] +flights_database = {"Name": "flights-database", "CatalogId": "123412341234"} +test_database = {"Name": "test-database", "CatalogId": "123412341234"} +empty_database = {"Name": "empty-database", "CatalogId": "000000000000"} + tables_1 = [ { "Name": "avro", From 16698da509ec5c9f86188db7a16c38cea19d61bf Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 26 Dec 2024 18:10:09 +0530 Subject: [PATCH 2/2] fix(ingest/gc): misc fixes in gc source (#12226) --- .../datahub/ingestion/source/gc/datahub_gc.py | 23 +++++-- .../source/gc/execution_request_cleanup.py | 61 +++++++++++++++---- .../source_report/ingestion_stage.py | 1 + 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 4eecbb4d9d7177..168b787b85e8be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -34,6 +34,7 @@ SoftDeletedEntitiesCleanupConfig, SoftDeletedEntitiesReport, ) +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport logger = logging.getLogger(__name__) @@ -86,6 +87,7 @@ class DataHubGcSourceReport( DataProcessCleanupReport, SoftDeletedEntitiesReport, DatahubExecutionRequestCleanupReport, + IngestionStageReport, ): expired_tokens_revoked: int = 0 @@ -139,31 +141,40 @@ def get_workunits_internal( ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: try: + self.report.report_ingestion_stage_start("Expired Token Cleanup") self.revoke_expired_tokens() except Exception as e: self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: try: + self.report.report_ingestion_stage_start("Truncate Indices") self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) if self.config.soft_deleted_entities_cleanup.enabled: try: + self.report.report_ingestion_stage_start( + "Soft Deleted Entities Cleanup" + ) self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) - if self.config.execution_request_cleanup.enabled: - try: - self.execution_request_cleanup.run() - except Exception as e: - self.report.failure("While trying to cleanup execution request ", exc=e) if self.config.dataprocess_cleanup.enabled: try: + self.report.report_ingestion_stage_start("Data Process Cleanup") yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: self.report.failure("While trying to cleanup data process ", exc=e) + if self.config.execution_request_cleanup.enabled: + try: + self.report.report_ingestion_stage_start("Execution request Cleanup") + self.execution_request_cleanup.run() + except Exception as e: + self.report.failure("While trying to cleanup execution request ", exc=e) + # Otherwise last stage's duration does not get calculated. + self.report.report_ingestion_stage_start("End") yield from [] def truncate_indices(self) -> None: @@ -281,6 +292,8 @@ def revoke_expired_tokens(self) -> None: list_access_tokens = expired_tokens_res.get("listAccessTokens", {}) tokens = list_access_tokens.get("tokens", []) total = list_access_tokens.get("total", 0) + if tokens == []: + break for token in tokens: self.report.expired_tokens_revoked += 1 token_id = token["id"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index 3baf858e44cdc8..170a6ada3e336f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -1,3 +1,4 @@ +import datetime import logging import time from typing import Any, Dict, Iterator, Optional @@ -42,16 +43,28 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): description="Global switch for this cleanup task", ) + runtime_limit_seconds: int = Field( + default=3600, + description="Maximum runtime in seconds for the cleanup task", + ) + + max_read_errors: int = Field( + default=10, + description="Maximum number of read errors before aborting", + ) + def keep_history_max_milliseconds(self): return self.keep_history_max_days * 24 * 3600 * 1000 class DatahubExecutionRequestCleanupReport(SourceReport): - execution_request_cleanup_records_read: int = 0 - execution_request_cleanup_records_preserved: int = 0 - execution_request_cleanup_records_deleted: int = 0 - execution_request_cleanup_read_errors: int = 0 - execution_request_cleanup_delete_errors: int = 0 + ergc_records_read: int = 0 + ergc_records_preserved: int = 0 + ergc_records_deleted: int = 0 + ergc_read_errors: int = 0 + ergc_delete_errors: int = 0 + ergc_start_time: Optional[datetime.datetime] = None + ergc_end_time: Optional[datetime.datetime] = None class CleanupRecord(BaseModel): @@ -124,6 +137,13 @@ def _scroll_execution_requests( params.update(overrides) while True: + if self._reached_runtime_limit(): + break + if self.report.ergc_read_errors >= self.config.max_read_errors: + self.report.failure( + f"ergc({self.instance_id}): too many read errors, aborting." + ) + break try: url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}" response = self.graph._session.get(url, headers=headers, params=params) @@ -141,7 +161,7 @@ def _scroll_execution_requests( logger.error( f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" ) - self.report.execution_request_cleanup_read_errors += 1 + self.report.ergc_read_errors += 1 def _scroll_garbage_records(self): state: Dict[str, Dict] = {} @@ -150,7 +170,7 @@ def _scroll_garbage_records(self): running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 for entry in self._scroll_execution_requests(): - self.report.execution_request_cleanup_records_read += 1 + self.report.ergc_records_read += 1 key = entry.ingestion_source # Always delete corrupted records @@ -171,7 +191,7 @@ def _scroll_garbage_records(self): # Do not delete if number of requests is below minimum if state[key]["count"] < self.config.keep_history_min_count: - self.report.execution_request_cleanup_records_preserved += 1 + self.report.ergc_records_preserved += 1 continue # Do not delete if number of requests do not exceed allowed maximum, @@ -179,7 +199,7 @@ def _scroll_garbage_records(self): if (state[key]["count"] < self.config.keep_history_max_count) and ( entry.requested_at > state[key]["cutoffTimestamp"] ): - self.report.execution_request_cleanup_records_preserved += 1 + self.report.ergc_records_preserved += 1 continue # Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not @@ -188,7 +208,7 @@ def _scroll_garbage_records(self): "RUNNING", "PENDING", ]: - self.report.execution_request_cleanup_records_preserved += 1 + self.report.ergc_records_preserved += 1 continue # Otherwise delete current record @@ -200,7 +220,7 @@ def _scroll_garbage_records(self): f"record timestamp: {entry.requested_at}." ) ) - self.report.execution_request_cleanup_records_deleted += 1 + self.report.ergc_records_deleted += 1 yield entry def _delete_entry(self, entry: CleanupRecord) -> None: @@ -210,17 +230,31 @@ def _delete_entry(self, entry: CleanupRecord) -> None: ) self.graph.delete_entity(entry.urn, True) except Exception as e: - self.report.execution_request_cleanup_delete_errors += 1 + self.report.ergc_delete_errors += 1 logger.error( f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" ) + def _reached_runtime_limit(self) -> bool: + if ( + self.config.runtime_limit_seconds + and self.report.ergc_start_time + and ( + datetime.datetime.now() - self.report.ergc_start_time + >= datetime.timedelta(seconds=self.config.runtime_limit_seconds) + ) + ): + logger.info(f"ergc({self.instance_id}): max runtime reached.") + return True + return False + def run(self) -> None: if not self.config.enabled: logger.info( f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled." ) return + self.report.ergc_start_time = datetime.datetime.now() logger.info( ( @@ -232,8 +266,11 @@ def run(self) -> None: ) for entry in self._scroll_garbage_records(): + if self._reached_runtime_limit(): + break self._delete_entry(entry) + self.report.ergc_end_time = datetime.datetime.now() logger.info( f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records." ) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 42b3b648bd298d..ce683e64b3f468 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -42,4 +42,5 @@ def report_ingestion_stage_start(self, stage: str) -> None: self._timer = PerfTimer() self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" + logger.info(f"Stage started: {self.ingestion_stage}") self._timer.start()