From 14d5a9d06bf5f41afc934ed58a465f2e2e537007 Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Fri, 5 Jan 2024 14:21:55 -0800 Subject: [PATCH 1/2] Override datajob external_url. --- .../airflow-plugin/src/datahub_airflow_plugin/_config.py | 3 +++ .../src/datahub_airflow_plugin/client/airflow_generator.py | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 67843da2ba995d..8b674d27bf0462 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -41,6 +41,9 @@ class DatahubLineageConfig(ConfigModel): # The Airflow plugin behaves as if it were set to True. graceful_exceptions: bool = True + # Override the external urls of datajob as Airflow DAG page. + override_datajob_url: bool = False + def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index e1d53be7bae6b9..d4c3c1a2b18222 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -13,6 +13,7 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED +from datahub_airflow_plugin.lineage.datahub import get_lineage_backend_config assert AIRFLOW_PATCHED @@ -269,6 +270,11 @@ def generate_datajob( base_url = conf.get("webserver", "base_url") datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}" + config = get_lineage_backend_config() + + if config.override_datajob_url: + datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" + if capture_owner and dag.owner: datajob.owners.add(dag.owner) From 72ce55465aa9c5d50540f1828f59264e67970ab5 Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Mon, 8 Jan 2024 13:53:00 -0800 Subject: [PATCH 2/2] Update --- .../airflow-plugin/src/datahub_airflow_plugin/_config.py | 4 ++-- .../src/datahub_airflow_plugin/client/airflow_generator.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 8b674d27bf0462..72a7849eda9e19 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -41,8 +41,8 @@ class DatahubLineageConfig(ConfigModel): # The Airflow plugin behaves as if it were set to True. graceful_exceptions: bool = True - # Override the external urls of datajob as Airflow DAG page. - override_datajob_url: bool = False + # Override the external urls of datajob. + override_datajob_url: str = None def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index d4c3c1a2b18222..6fec3fa8327bd4 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -273,7 +273,7 @@ def generate_datajob( config = get_lineage_backend_config() if config.override_datajob_url: - datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" + datajob.url = config.override_datajob_url.format(base_url=base_url, dag_id=datajob.flow_urn.get_flow_id(), task_id=task.task_id) if capture_owner and dag.owner: datajob.owners.add(dag.owner)