From 177a50f9a77f6fd85b053bc655e2d0202759a07e Mon Sep 17 00:00:00 2001 From: dushayntAW <158567391+dushayntAW@users.noreply.github.com> Date: Mon, 10 Jun 2024 11:54:20 +0200 Subject: [PATCH] fix(ingestion/airflow-plugin): airflow remove old tasks (#10485) --- docs/lineage/airflow.md | 31 +++++++ .../datahub_listener.py | 90 +++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 1745c23cb1923..a36f3bbd4bd16 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -233,6 +233,37 @@ You can also create a custom extractor to extract lineage from any operator. Thi See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator. +## Cleanup obsolete pipelines and tasks from Datahub + +There might be a case where the DAGs are removed from the Airflow but the corresponding pipelines and tasks are still there in the Datahub, let's call such pipelines ans tasks, `obsolete pipelines and tasks` + +Following are the steps to cleanup them from the datahub: +- create a DAG named `Datahub_Cleanup`, i.e. + +```python +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator + +from datahub_airflow_plugin.entities import Dataset, Urn + +with DAG( + "Datahub_Cleanup", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + task = BashOperator( + task_id="cleanup_obsolete_data", + dag=dag, + bash_command="echo 'cleaning up the obsolete data from datahub'", + ) + +``` +- ingest this DAG, and it will remove all the obsolete pipelines and tasks from the Datahub based on the `cluster` value set in the `airflow.cfg` + + ## Emit Lineage Directly If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index ed155a35a925c..40c36d6106e2b 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -1,3 +1,4 @@ +import asyncio import copy import functools import logging @@ -8,12 +9,15 @@ import airflow import datahub.emitter.mce_builder as builder +from airflow.models.serialized_dag import SerializedDagModel from datahub.api.entities.datajob import DataJob from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( + DataFlowKeyClass, + DataJobKeyClass, FineGrainedLineageClass, FineGrainedLineageDownstreamTypeClass, FineGrainedLineageUpstreamTypeClass, @@ -68,6 +72,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811 "1", ) _RUN_IN_THREAD_TIMEOUT = 30 +_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup" def get_airflow_plugin_listener() -> Optional["DataHubListener"]: @@ -542,6 +547,81 @@ def on_dag_start(self, dag_run: "DagRun") -> None: self.emitter.emit(event) + if dag.dag_id == _DATAHUB_CLEANUP_DAG: + assert self.graph + + logger.debug("Initiating the cleanup of obsselete data from datahub") + + # get all ingested dataflow and datajob + ingested_dataflow_urns = list( + self.graph.get_urns_by_filter( + platform="airflow", + entity_types=["dataFlow"], + ) + ) + ingested_datajob_urns = list( + self.graph.get_urns_by_filter( + platform="airflow", entity_types=["dataJob"] + ) + ) + + # filter the ingested dataflow and datajob based on the cluster + filtered_ingested_dataflow_urns: List = [] + filtered_ingested_datajob_urns: List = [] + + for ingested_dataflow_urn in ingested_dataflow_urns: + data_flow_aspect = self.graph.get_aspect( + entity_urn=ingested_dataflow_urn, aspect_type=DataFlowKeyClass + ) + if ( + data_flow_aspect is not None + and data_flow_aspect.flowId != _DATAHUB_CLEANUP_DAG + and data_flow_aspect is not None + and data_flow_aspect.cluster == self.config.cluster + ): + filtered_ingested_dataflow_urns.append(ingested_dataflow_urn) + + for ingested_datajob_urn in ingested_datajob_urns: + data_job_aspect = self.graph.get_aspect( + entity_urn=ingested_datajob_urn, aspect_type=DataJobKeyClass + ) + if ( + data_job_aspect is not None + and data_job_aspect.flow in filtered_ingested_dataflow_urns + ): + filtered_ingested_datajob_urns.append(ingested_datajob_urn) + + # get all airflow dags + all_airflow_dags = SerializedDagModel.read_all_dags().values() + + airflow_flow_urns: List = [] + airflow_job_urns: List = [] + + for dag in all_airflow_dags: + flow_urn = builder.make_data_flow_urn( + orchestrator="airflow", + flow_id=dag.dag_id, + cluster=self.config.cluster, + ) + airflow_flow_urns.append(flow_urn) + + for task in dag.tasks: + airflow_job_urns.append( + builder.make_data_job_urn_with_flow(str(flow_urn), task.task_id) + ) + + obsolete_pipelines = set(filtered_ingested_dataflow_urns) - set( + airflow_flow_urns + ) + obsolete_tasks = set(filtered_ingested_datajob_urns) - set(airflow_job_urns) + + obsolete_urns = obsolete_pipelines.union(obsolete_tasks) + + asyncio.run(self._soft_delete_obsolete_urns(obsolete_urns=obsolete_urns)) + + logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}") + logger.debug(f"total tasks removed = {len(obsolete_tasks)}") + if HAS_AIRFLOW_DAG_LISTENER_API: @hookimpl @@ -578,3 +658,13 @@ def on_dataset_changed(self, dataset: "Dataset") -> None: logger.debug( f"DataHub listener got notification about dataset change for {dataset}" ) + + async def _soft_delete_obsolete_urns(self, obsolete_urns): + delete_tasks = [self._delete_obsolete_data(urn) for urn in obsolete_urns] + await asyncio.gather(*delete_tasks) + + async def _delete_obsolete_data(self, obsolete_urn): + assert self.graph + + if self.graph.exists(str(obsolete_urn)): + self.graph.soft_delete_entity(str(obsolete_urn))