Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): airflow remove old tasks (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Jun 10, 2024
1 parent 812bcbb commit 177a50f
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
31 changes: 31 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,37 @@ You can also create a custom extractor to extract lineage from any operator. Thi

See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator.

## Cleanup obsolete pipelines and tasks from Datahub

There might be a case where the DAGs are removed from the Airflow but the corresponding pipelines and tasks are still there in the Datahub, let's call such pipelines ans tasks, `obsolete pipelines and tasks`

Following are the steps to cleanup them from the datahub:
- create a DAG named `Datahub_Cleanup`, i.e.

```python
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

from datahub_airflow_plugin.entities import Dataset, Urn

with DAG(
"Datahub_Cleanup",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
task = BashOperator(
task_id="cleanup_obsolete_data",
dag=dag,
bash_command="echo 'cleaning up the obsolete data from datahub'",
)

```
- ingest this DAG, and it will remove all the obsolete pipelines and tasks from the Datahub based on the `cluster` value set in the `airflow.cfg`


## Emit Lineage Directly

If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import copy
import functools
import logging
Expand All @@ -8,12 +9,15 @@

import airflow
import datahub.emitter.mce_builder as builder
from airflow.models.serialized_dag import SerializedDagModel
from datahub.api.entities.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
DataFlowKeyClass,
DataJobKeyClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
Expand Down Expand Up @@ -68,6 +72,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"


def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
Expand Down Expand Up @@ -542,6 +547,81 @@ def on_dag_start(self, dag_run: "DagRun") -> None:

self.emitter.emit(event)

if dag.dag_id == _DATAHUB_CLEANUP_DAG:
assert self.graph

logger.debug("Initiating the cleanup of obsselete data from datahub")

# get all ingested dataflow and datajob
ingested_dataflow_urns = list(
self.graph.get_urns_by_filter(
platform="airflow",
entity_types=["dataFlow"],
)
)
ingested_datajob_urns = list(
self.graph.get_urns_by_filter(
platform="airflow", entity_types=["dataJob"]
)
)

# filter the ingested dataflow and datajob based on the cluster
filtered_ingested_dataflow_urns: List = []
filtered_ingested_datajob_urns: List = []

for ingested_dataflow_urn in ingested_dataflow_urns:
data_flow_aspect = self.graph.get_aspect(
entity_urn=ingested_dataflow_urn, aspect_type=DataFlowKeyClass
)
if (
data_flow_aspect is not None
and data_flow_aspect.flowId != _DATAHUB_CLEANUP_DAG
and data_flow_aspect is not None
and data_flow_aspect.cluster == self.config.cluster
):
filtered_ingested_dataflow_urns.append(ingested_dataflow_urn)

for ingested_datajob_urn in ingested_datajob_urns:
data_job_aspect = self.graph.get_aspect(
entity_urn=ingested_datajob_urn, aspect_type=DataJobKeyClass
)
if (
data_job_aspect is not None
and data_job_aspect.flow in filtered_ingested_dataflow_urns
):
filtered_ingested_datajob_urns.append(ingested_datajob_urn)

# get all airflow dags
all_airflow_dags = SerializedDagModel.read_all_dags().values()

airflow_flow_urns: List = []
airflow_job_urns: List = []

for dag in all_airflow_dags:
flow_urn = builder.make_data_flow_urn(
orchestrator="airflow",
flow_id=dag.dag_id,
cluster=self.config.cluster,
)
airflow_flow_urns.append(flow_urn)

for task in dag.tasks:
airflow_job_urns.append(
builder.make_data_job_urn_with_flow(str(flow_urn), task.task_id)
)

obsolete_pipelines = set(filtered_ingested_dataflow_urns) - set(
airflow_flow_urns
)
obsolete_tasks = set(filtered_ingested_datajob_urns) - set(airflow_job_urns)

obsolete_urns = obsolete_pipelines.union(obsolete_tasks)

asyncio.run(self._soft_delete_obsolete_urns(obsolete_urns=obsolete_urns))

logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}")
logger.debug(f"total tasks removed = {len(obsolete_tasks)}")

if HAS_AIRFLOW_DAG_LISTENER_API:

@hookimpl
Expand Down Expand Up @@ -578,3 +658,13 @@ def on_dataset_changed(self, dataset: "Dataset") -> None:
logger.debug(
f"DataHub listener got notification about dataset change for {dataset}"
)

async def _soft_delete_obsolete_urns(self, obsolete_urns):
delete_tasks = [self._delete_obsolete_data(urn) for urn in obsolete_urns]
await asyncio.gather(*delete_tasks)

async def _delete_obsolete_data(self, obsolete_urn):
assert self.graph

if self.graph.exists(str(obsolete_urn)):
self.graph.soft_delete_entity(str(obsolete_urn))

0 comments on commit 177a50f

Please sign in to comment.