From 49b6284ebfa6fae65bf463e0eb3218b9793bb1f2 Mon Sep 17 00:00:00 2001 From: Steffen Grohsschmiedt Date: Wed, 4 Dec 2024 01:16:44 +0100 Subject: [PATCH] fix(airflow): fix AthenaOperator extraction (#11857) Co-authored-by: Harshal Sheth --- .../airflow-plugin/setup.py | 2 +- .../src/datahub_airflow_plugin/_extractors.py | 24 +- .../tests/integration/dags/athena_operator.py | 43 ++ .../goldens/v2_athena_operator.json | 672 ++++++++++++++++++ .../v2_athena_operator_no_dag_listener.json | 672 ++++++++++++++++++ .../tests/integration/test_plugin.py | 29 + 6 files changed, 1440 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py create mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json create mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 0d5ceefd989dc..02a0bbb6022e0 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -96,7 +96,7 @@ def get_long_description(): *plugins["datahub-kafka"], f"acryl-datahub[testing-utils]{_self_pin}", # Extra requirements for loading our test dags. - "apache-airflow[snowflake]>=2.0.2", + "apache-airflow[snowflake,amazon]>=2.0.2", # A collection of issues we've encountered: # - Connexion's new version breaks Airflow: # See https://github.com/apache/airflow/issues/35234. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index de0d4f8711f53..28d5775f61f54 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -50,7 +50,6 @@ def __init__(self): "BigQueryOperator", "BigQueryExecuteQueryOperator", # Athena also does something similar. - "AthenaOperator", "AWSAthenaOperator", # Additional types that OL doesn't support. This is only necessary because # on older versions of Airflow, these operators don't inherit from SQLExecuteQueryOperator. @@ -59,6 +58,8 @@ def __init__(self): for operator in _sql_operator_overrides: self.task_to_extractor.extractors[operator] = GenericSqlExtractor + self.task_to_extractor.extractors["AthenaOperator"] = AthenaOperatorExtractor + self.task_to_extractor.extractors[ "BigQueryInsertJobOperator" ] = BigQueryInsertJobOperatorExtractor @@ -276,6 +277,27 @@ def extract(self) -> Optional[TaskMetadata]: ) +class AthenaOperatorExtractor(BaseExtractor): + def extract(self) -> Optional[TaskMetadata]: + from airflow.providers.amazon.aws.operators.athena import ( + AthenaOperator, # type: ignore + ) + + operator: "AthenaOperator" = self.operator + sql = operator.query + if not sql: + self.log.warning("No query found in AthenaOperator") + return None + + return _parse_sql_into_task_metadata( + self, + sql, + platform="athena", + default_database=None, + default_schema=self.operator.database, + ) + + def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: if hasattr(self.operator, "schema") and self.operator.schema is not None: return self.operator.schema diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py new file mode 100644 index 0000000000000..96cdacbbad37d --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py @@ -0,0 +1,43 @@ +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.athena import AthenaOperator + +ATHENA_COST_TABLE = "costs" +ATHENA_PROCESSED_TABLE = "processed_costs" + + +def _fake_athena_execute(*args, **kwargs): + pass + + +with DAG( + "athena_operator", + start_date=datetime(2023, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + # HACK: We don't want to send real requests to Athena. As a workaround, + # we can simply monkey-patch the operator. + AthenaOperator.execute = _fake_athena_execute # type: ignore + + transform_cost_table = AthenaOperator( + aws_conn_id="my_aws", + task_id="transform_cost_table", + database="athena_db", + query=""" + CREATE OR REPLACE TABLE {{ params.out_table_name }} AS + SELECT + id, + month, + total_cost, + area, + total_cost / area as cost_per_area + FROM {{ params.in_table_name }} + """, + params={ + "in_table_name": ATHENA_COST_TABLE, + "out_table_name": ATHENA_PROCESSED_TABLE, + }, + output_location="s3://athena-results-bucket/", + ) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json new file mode 100644 index 0000000000000..baa738fef7b5f --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json @@ -0,0 +1,672 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "None", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=athena_operator", + "name": "athena_operator", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "athena_operator" + } + ] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "AthenaOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/dags/athena_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "orchestrator": "airflow", + "dag_id": "athena_operator", + "task_id": "transform_cost_table" + }, + "externalUrl": "http://airflow.example.com/dags/athena_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "name": "athena_operator_transform_cost_table_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1732719433576, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1732719433576, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1732719433736, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1732719433736 + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1732719433747, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json new file mode 100644 index 0000000000000..c53825a9979e3 --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json @@ -0,0 +1,672 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "None", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=athena_operator", + "name": "athena_operator", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "athena_operator" + } + ] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "AthenaOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=athena_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "athena_operator", + "task_id": "transform_cost_table" + }, + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=athena_operator&map_index=-1", + "name": "athena_operator_transform_cost_table_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1733121901482, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1733121901482, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1733121901625, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1733121901625 + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1733121901675, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 3becf10703df6..75bb43af1a43d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -111,6 +111,24 @@ def _wait_for_dag_finish( raise NotReadyError(f"DAG has not finished yet: {dag_run['state']}") +@tenacity.retry( + reraise=True, + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_delay(90), + retry=tenacity.retry_if_exception_type(NotReadyError), +) +def _wait_for_dag_to_load(airflow_instance: AirflowInstance, dag_id: str) -> None: + print("Checking if DAG was loaded") + res = airflow_instance.session.get( + url=f"{airflow_instance.airflow_url}/api/v1/dags", + timeout=5, + ) + res.raise_for_status() + + if len(list(filter(lambda x: x["dag_id"] == dag_id, res.json()["dags"]))) == 0: + raise NotReadyError("DAG was not loaded yet") + + def _dump_dag_logs(airflow_instance: AirflowInstance, dag_id: str) -> None: # Get the dag run info res = airflow_instance.session.get( @@ -206,6 +224,15 @@ def _run_airflow( "insecure_mode": "true", }, ).get_uri(), + "AIRFLOW_CONN_MY_AWS": Connection( + conn_id="my_aws", + conn_type="aws", + extra={ + "region_name": "us-east-1", + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + ).get_uri(), "AIRFLOW_CONN_MY_SQLITE": Connection( conn_id="my_sqlite", conn_type="sqlite", @@ -327,6 +354,7 @@ class DagTestCase: DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), DagTestCase("datahub_emitter_operator_jinja_template_dag", v2_only=True), + DagTestCase("athena_operator", v2_only=True), ] @@ -398,6 +426,7 @@ def test_airflow_plugin( tmp_path, dags_folder=DAGS_FOLDER, is_v1=is_v1 ) as airflow_instance: print(f"Running DAG {dag_id}...") + _wait_for_dag_to_load(airflow_instance, dag_id) subprocess.check_call( [ "airflow",