From 750aab9a510d1969aba5121d073347e00347a6e2 Mon Sep 17 00:00:00 2001 From: Francesco Macagno Date: Wed, 19 Jun 2024 04:14:30 -0500 Subject: [PATCH] feat: allow task ownership as group (#10742) --- docs/lineage/airflow.md | 30 ++++++++++--------- .../src/datahub_airflow_plugin/_config.py | 9 +++++- .../client/airflow_generator.py | 11 +++++-- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index a36f3bbd4bd16a..62715ed506ffe9 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -66,11 +66,12 @@ enabled = True # default ``` | Name | Default value | Description | -| -------------------------- | -------------------- | ---------------------------------------------------------------------------------------- | +|----------------------------|----------------------|------------------------------------------------------------------------------------------| | enabled | true | If the plugin should be enabled. | | conn_id | datahub_rest_default | The name of the datahub rest connection. | -| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance | +| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance | | capture_ownership_info | true | Extract DAG ownership. | +| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user | | capture_tags_info | true | Extract DAG tags. | | capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. | | materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | @@ -130,18 +131,19 @@ conn_id = datahub_rest_default # or datahub_kafka_default # etc. ``` -| Name | Default value | Description | -| ---------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| enabled | true | If the plugin should be enabled. | -| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | -| cluster | prod | name of the airflow cluster | -| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | -| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | -| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | -| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | -| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | -| | -| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | +| Name | Default value | Description | +|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| enabled | true | If the plugin should be enabled. | +| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | +| cluster | prod | name of the airflow cluster | +| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | +| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user. | +| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | +| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | +| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | +| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | +| | +| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index f2cd647837d5db..c37a1b334ed377 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -28,9 +28,12 @@ class DatahubLineageConfig(ConfigModel): # Cluster to associate with the pipelines and tasks. Defaults to "prod". cluster: str = builder.DEFAULT_FLOW_CLUSTER - # If true, the owners field of the DAG will be capture as a DataHub corpuser. + # If true, the owners field of the DAG will be captured as a DataHub corpuser. capture_ownership_info: bool = True + # If true, the owners field of the DAG will instead be captured as a DataHub corpgroup. + capture_ownership_as_group: bool = False + # If true, the tags field of the DAG will be captured as DataHub tags. capture_tags_info: bool = True @@ -70,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig: capture_ownership_info = conf.get( "datahub", "capture_ownership_info", fallback=True ) + capture_ownership_as_group = conf.get( + "datahub", "capture_ownership_as_group", fallback=False + ) capture_executions = conf.get("datahub", "capture_executions", fallback=True) materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True) enable_extractors = conf.get("datahub", "enable_extractors", fallback=True) @@ -87,6 +93,7 @@ def get_lineage_config() -> DatahubLineageConfig: datahub_conn_id=datahub_conn_id, cluster=cluster, capture_ownership_info=capture_ownership_info, + capture_ownership_as_group=capture_ownership_as_group, capture_tags_info=capture_tags_info, capture_executions=capture_executions, materialize_iolets=materialize_iolets, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index d18b31a5ff3496..8aa154dc267b60 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -175,7 +175,11 @@ def generate_dataflow( data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}" if config.capture_ownership_info and dag.owner: - data_flow.owners.update(owner.strip() for owner in dag.owner.split(",")) + owners = [owner.strip() for owner in dag.owner.split(",")] + if config.capture_ownership_as_group: + data_flow.group_owners.update(owners) + else: + data_flow.owners.update(owners) if config.capture_tags_info and dag.tags: data_flow.tags.update(dag.tags) @@ -278,7 +282,10 @@ def generate_datajob( datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: - datajob.owners.add(dag.owner) + if config and config.capture_ownership_as_group: + datajob.group_owners.add(dag.owner) + else: + datajob.owners.add(dag.owner) if capture_tags and dag.tags: datajob.tags.update(dag.tags)