Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Dec 26, 2024
2 parents 6d347af + 16698da commit 199bfcd
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 29 deletions.
14 changes: 12 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
Expand Down Expand Up @@ -115,7 +116,6 @@

logger = logging.getLogger(__name__)


DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]

Expand Down Expand Up @@ -220,6 +220,7 @@ def platform_validator(cls, v: str) -> str:
class GlueSourceReport(StaleEntityRemovalSourceReport):
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
databases: EntityFilterReport = EntityFilterReport.field(type="database")

num_job_script_location_missing: int = 0
num_job_script_location_invalid: int = 0
Expand Down Expand Up @@ -668,6 +669,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)

def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
logger.debug("Getting all databases")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
paginator = self.glue_client.get_paginator("get_databases")

Expand All @@ -684,10 +686,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
pattern += "[?!TargetDatabase]"

for database in paginator_response.search(pattern):
if self.source_config.database_pattern.allowed(database["Name"]):
if (not self.source_config.database_pattern.allowed(database["Name"])) or (
self.source_config.catalog_id
and database.get("CatalogId")
and database.get("CatalogId") != self.source_config.catalog_id
):
self.report.databases.dropped(database["Name"])
else:
self.report.databases.processed(database["Name"])
yield database

def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
logger.debug(f"Getting tables from database {database['Name']}")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
paginator = self.glue_client.get_paginator("get_tables")
database_name = database["Name"]
Expand Down
23 changes: 18 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,7 @@ class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
IngestionStageReport,
):
expired_tokens_revoked: int = 0

Expand Down Expand Up @@ -139,31 +141,40 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from []

def truncate_indices(self) -> None:
Expand Down Expand Up @@ -281,6 +292,8 @@ def revoke_expired_tokens(self) -> None:
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
break
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import time
from typing import Any, Dict, Iterator, Optional
Expand Down Expand Up @@ -42,16 +43,28 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
description="Global switch for this cleanup task",
)

runtime_limit_seconds: int = Field(
default=3600,
description="Maximum runtime in seconds for the cleanup task",
)

max_read_errors: int = Field(
default=10,
description="Maximum number of read errors before aborting",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0
ergc_records_read: int = 0
ergc_records_preserved: int = 0
ergc_records_deleted: int = 0
ergc_read_errors: int = 0
ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None


class CleanupRecord(BaseModel):
Expand Down Expand Up @@ -124,6 +137,13 @@ def _scroll_execution_requests(
params.update(overrides)

while True:
if self._reached_runtime_limit():
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
)
break
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
Expand All @@ -141,7 +161,7 @@ def _scroll_execution_requests(
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1
self.report.ergc_read_errors += 1

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}
Expand All @@ -150,7 +170,7 @@ def _scroll_garbage_records(self):
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
self.report.ergc_records_read += 1
key = entry.ingestion_source

# Always delete corrupted records
Expand All @@ -171,15 +191,15 @@ def _scroll_garbage_records(self):

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
Expand All @@ -188,7 +208,7 @@ def _scroll_garbage_records(self):
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Otherwise delete current record
Expand All @@ -200,7 +220,7 @@ def _scroll_garbage_records(self):
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
self.report.ergc_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
Expand All @@ -210,17 +230,31 @@ def _delete_entry(self, entry: CleanupRecord) -> None:
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
self.report.ergc_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def _reached_runtime_limit(self) -> bool:
if (
self.config.runtime_limit_seconds
and self.report.ergc_start_time
and (
datetime.datetime.now() - self.report.ergc_start_time
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return
self.report.ergc_start_time = datetime.datetime.now()

logger.info(
(
Expand All @@ -232,8 +266,11 @@ def run(self) -> None:
)

for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
break
self._delete_entry(entry)

self.report.ergc_end_time = datetime.datetime.now()
logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def report_ingestion_stage_start(self, stage: str) -> None:
self._timer = PerfTimer()

self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
self._timer.start()
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
43 changes: 39 additions & 4 deletions metadata-ingestion/tests/unit/glue/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
validate_all_providers_have_committed_successfully,
)
from tests.unit.glue.test_glue_source_stubs import (
databases_1,
databases_2,
empty_database,
flights_database,
get_bucket_tagging,
get_databases_delta_response,
get_databases_response,
Expand Down Expand Up @@ -64,6 +64,7 @@
tables_2,
tables_profiling_1,
target_database_tables,
test_database,
)

FROZEN_TIME = "2020-04-14 07:00:00"
Expand Down Expand Up @@ -310,6 +311,40 @@ def test_config_without_platform():
assert source.platform == "glue"


def test_get_databases_filters_by_catalog():
def format_databases(databases):
return set(d["Name"] for d in databases)

all_catalogs_source: GlueSource = GlueSource(
config=GlueSourceConfig(aws_region="us-west-2"),
ctx=PipelineContext(run_id="glue-source-test"),
)
with Stubber(all_catalogs_source.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response, {})

expected = [flights_database, test_database, empty_database]
actual = all_catalogs_source.get_all_databases()
assert format_databases(actual) == format_databases(expected)
assert all_catalogs_source.report.databases.dropped_entities.as_obj() == []

catalog_id = "123412341234"
single_catalog_source: GlueSource = GlueSource(
config=GlueSourceConfig(catalog_id=catalog_id, aws_region="us-west-2"),
ctx=PipelineContext(run_id="glue-source-test"),
)
with Stubber(single_catalog_source.glue_client) as glue_stubber:
glue_stubber.add_response(
"get_databases", get_databases_response, {"CatalogId": catalog_id}
)

expected = [flights_database, test_database]
actual = single_catalog_source.get_all_databases()
assert format_databases(actual) == format_databases(expected)
assert single_catalog_source.report.databases.dropped_entities.as_obj() == [
"empty-database"
]


@freeze_time(FROZEN_TIME)
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
Expand Down Expand Up @@ -357,8 +392,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
tables_on_first_call = tables_1
tables_on_second_call = tables_2
mock_get_all_databases_and_tables.side_effect = [
(databases_1, tables_on_first_call),
(databases_2, tables_on_second_call),
([flights_database], tables_on_first_call),
([test_database], tables_on_second_call),
]

pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
Expand Down
8 changes: 5 additions & 3 deletions metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@
"Permissions": ["ALL"],
}
],
"CatalogId": "123412341234",
"CatalogId": "000000000000",
},
]
}
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
flights_database = {"Name": "flights-database", "CatalogId": "123412341234"}
test_database = {"Name": "test-database", "CatalogId": "123412341234"}
empty_database = {"Name": "empty-database", "CatalogId": "000000000000"}

tables_1 = [
{
"Name": "avro",
Expand Down

0 comments on commit 199bfcd

Please sign in to comment.