Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 7, 2024
2 parents 215ca01 + 70bec48 commit a7afedb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
assert self.ctx.graph
dpis = []
start = 0
# This graphql endpoint doesn't support scrolling and therefore after 10k DPIs it causes performance issues on ES
# Therefore, we are limiting the max DPIs to 9000
max_item = 9000
while True:
try:
job_query_result = self.ctx.graph.execute_graphql(
Expand All @@ -226,10 +229,12 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
if len(runs) < batch_size or start >= max_item:
break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
self.report.failure(
f"Exception while fetching DPIs for job {job_urn}:", exc=e
)
break
return dpis

Expand All @@ -254,8 +259,9 @@ def keep_last_n_dpi(
deleted_count_last_n += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

self.report.report_failure(
f"Exception while deleting DPI: {e}", exc=e
)
if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if self.config.delay:
Expand Down Expand Up @@ -289,7 +295,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort(
key=lambda x: x["created"]["time"]
if "created" in x and "time" in x["created"]
if x.get("created") and x["created"].get("time")
else 0,
reverse=True,
)
Expand Down Expand Up @@ -325,8 +331,8 @@ def remove_old_dpis(
continue

if (
"created" not in dpi
or "time" not in dpi["created"]
not dpi.get("created")
or not dpi["created"].get("time")
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit(
Expand All @@ -340,7 +346,7 @@ def remove_old_dpis(
deleted_count_retention += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")
self.report.report_failure(f"Exception while deleting DPI: {e}", exc=e)

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
Expand All @@ -351,9 +357,12 @@ def remove_old_dpis(
logger.info(f"Sleeping for {self.config.delay} seconds")
time.sleep(self.config.delay)

logger.info(
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
)
if deleted_count_retention > 0:
logger.info(
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
)
else:
logger.debug(f"No DPIs to delete from {job.urn} due to retention")

def get_data_flows(self) -> Iterable[DataFlowEntity]:
assert self.ctx.graph
Expand Down
48 changes: 48 additions & 0 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,54 @@ def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(10, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_null_created_time(
self, mock_fetch_dpis
):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": {"time": None},
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(11, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": None,
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(11, self.report.num_aspects_removed)

def test_fetch_dpis(self):
assert self.cleanup.ctx.graph
self.cleanup.ctx.graph = MagicMock()
Expand Down

0 comments on commit a7afedb

Please sign in to comment.