diff --git a/datahub-web-react/src/images/hudilogo.png b/datahub-web-react/src/images/hudilogo.png new file mode 100644 index 00000000000000..4b58cc5a34826d Binary files /dev/null and b/datahub-web-react/src/images/hudilogo.png differ diff --git a/docs-website/src/pages/cloud/UnifiedTabs/index.js b/docs-website/src/pages/cloud/UnifiedTabs/index.js index c0fbc25a8de6bc..d17138fcee629c 100644 --- a/docs-website/src/pages/cloud/UnifiedTabs/index.js +++ b/docs-website/src/pages/cloud/UnifiedTabs/index.js @@ -11,21 +11,21 @@ const TabbedComponent = () => { title: 'Discovery', description: 'All the search and discovery features of DataHub Core you already love, enhanced.', icon: "/img/assets/data-discovery.svg", - link: "https://www.acryldata.io/acryl-datahub", + link: "https://datahubproject.io/solutions/discovery", image: 'https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/demo/discovery.webm', }, { title: 'Observability', description: 'Detect, resolve, and prevent data quality issues before they impact your business. Unify data health signals from all your data quality tools, including dbt tests and more.', icon: "/img/assets/data-ob.svg", - link: "https://www.acryldata.io/observe", + link: "https://datahubproject.io/solutions/observability", image: 'https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/demo/observe.webm', }, { title: 'Governance', description: 'Powerful Automation, Reporting and Organizational tools to help you govern effectively.', icon: "/img/assets/data-governance.svg", - link: "https://www.acryldata.io/acryl-datahub#governance", + link: "https://datahubproject.io/solutions/governance", image: 'https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/demo/governance.webm', }, ]; diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 0d5ceefd989dca..02a0bbb6022e04 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -96,7 +96,7 @@ def get_long_description(): *plugins["datahub-kafka"], f"acryl-datahub[testing-utils]{_self_pin}", # Extra requirements for loading our test dags. - "apache-airflow[snowflake]>=2.0.2", + "apache-airflow[snowflake,amazon]>=2.0.2", # A collection of issues we've encountered: # - Connexion's new version breaks Airflow: # See https://github.com/apache/airflow/issues/35234. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index de0d4f8711f531..28d5775f61f542 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -50,7 +50,6 @@ def __init__(self): "BigQueryOperator", "BigQueryExecuteQueryOperator", # Athena also does something similar. - "AthenaOperator", "AWSAthenaOperator", # Additional types that OL doesn't support. This is only necessary because # on older versions of Airflow, these operators don't inherit from SQLExecuteQueryOperator. @@ -59,6 +58,8 @@ def __init__(self): for operator in _sql_operator_overrides: self.task_to_extractor.extractors[operator] = GenericSqlExtractor + self.task_to_extractor.extractors["AthenaOperator"] = AthenaOperatorExtractor + self.task_to_extractor.extractors[ "BigQueryInsertJobOperator" ] = BigQueryInsertJobOperatorExtractor @@ -276,6 +277,27 @@ def extract(self) -> Optional[TaskMetadata]: ) +class AthenaOperatorExtractor(BaseExtractor): + def extract(self) -> Optional[TaskMetadata]: + from airflow.providers.amazon.aws.operators.athena import ( + AthenaOperator, # type: ignore + ) + + operator: "AthenaOperator" = self.operator + sql = operator.query + if not sql: + self.log.warning("No query found in AthenaOperator") + return None + + return _parse_sql_into_task_metadata( + self, + sql, + platform="athena", + default_database=None, + default_schema=self.operator.database, + ) + + def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: if hasattr(self.operator, "schema") and self.operator.schema is not None: return self.operator.schema diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py new file mode 100644 index 00000000000000..96cdacbbad37dd --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/athena_operator.py @@ -0,0 +1,43 @@ +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.athena import AthenaOperator + +ATHENA_COST_TABLE = "costs" +ATHENA_PROCESSED_TABLE = "processed_costs" + + +def _fake_athena_execute(*args, **kwargs): + pass + + +with DAG( + "athena_operator", + start_date=datetime(2023, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + # HACK: We don't want to send real requests to Athena. As a workaround, + # we can simply monkey-patch the operator. + AthenaOperator.execute = _fake_athena_execute # type: ignore + + transform_cost_table = AthenaOperator( + aws_conn_id="my_aws", + task_id="transform_cost_table", + database="athena_db", + query=""" + CREATE OR REPLACE TABLE {{ params.out_table_name }} AS + SELECT + id, + month, + total_cost, + area, + total_cost / area as cost_per_area + FROM {{ params.in_table_name }} + """, + params={ + "in_table_name": ATHENA_COST_TABLE, + "out_table_name": ATHENA_PROCESSED_TABLE, + }, + output_location="s3://athena-results-bucket/", + ) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json new file mode 100644 index 00000000000000..baa738fef7b5fd --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator.json @@ -0,0 +1,672 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "None", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=athena_operator", + "name": "athena_operator", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "athena_operator" + } + ] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "AthenaOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/dags/athena_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "orchestrator": "airflow", + "dag_id": "athena_operator", + "task_id": "transform_cost_table" + }, + "externalUrl": "http://airflow.example.com/dags/athena_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "name": "athena_operator_transform_cost_table_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1732719433576, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1732719433576, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1732719433736, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1732719433736 + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1732719433747, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json new file mode 100644 index 00000000000000..c53825a9979e3d --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_athena_operator_no_dag_listener.json @@ -0,0 +1,672 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "None", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=athena_operator", + "name": "athena_operator", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,athena_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "athena_operator" + } + ] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "AthenaOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=athena_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "athena_operator", + "task_id": "transform_cost_table" + }, + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=athena_operator&map_index=-1", + "name": "athena_operator_transform_cost_table_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1733121901482, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1733121901482, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1733121901625, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1733121901625 + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'transform_cost_table'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'transform_cost_table'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=athena_operator&_flt_3_task_id=transform_cost_table", + "name": "transform_cost_table", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),month)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),area)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD),total_cost)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD),cost_per_area)" + ], + "confidenceScore": 1.0 + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:athena", + "name": "athena_db.processed_costs", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1733121901675, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 3becf10703df6c..75bb43af1a43dd 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -111,6 +111,24 @@ def _wait_for_dag_finish( raise NotReadyError(f"DAG has not finished yet: {dag_run['state']}") +@tenacity.retry( + reraise=True, + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_delay(90), + retry=tenacity.retry_if_exception_type(NotReadyError), +) +def _wait_for_dag_to_load(airflow_instance: AirflowInstance, dag_id: str) -> None: + print("Checking if DAG was loaded") + res = airflow_instance.session.get( + url=f"{airflow_instance.airflow_url}/api/v1/dags", + timeout=5, + ) + res.raise_for_status() + + if len(list(filter(lambda x: x["dag_id"] == dag_id, res.json()["dags"]))) == 0: + raise NotReadyError("DAG was not loaded yet") + + def _dump_dag_logs(airflow_instance: AirflowInstance, dag_id: str) -> None: # Get the dag run info res = airflow_instance.session.get( @@ -206,6 +224,15 @@ def _run_airflow( "insecure_mode": "true", }, ).get_uri(), + "AIRFLOW_CONN_MY_AWS": Connection( + conn_id="my_aws", + conn_type="aws", + extra={ + "region_name": "us-east-1", + "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", + "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + ).get_uri(), "AIRFLOW_CONN_MY_SQLITE": Connection( conn_id="my_sqlite", conn_type="sqlite", @@ -327,6 +354,7 @@ class DagTestCase: DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), DagTestCase("datahub_emitter_operator_jinja_template_dag", v2_only=True), + DagTestCase("athena_operator", v2_only=True), ] @@ -398,6 +426,7 @@ def test_airflow_plugin( tmp_path, dags_folder=DAGS_FOLDER, is_v1=is_v1 ) as airflow_instance: print(f"Running DAG {dag_id}...") + _wait_for_dag_to_load(airflow_instance, dag_id) subprocess.check_call( [ "airflow", diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_liquid_tag.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_liquid_tag.py index 7d4ebf00cc06ef..f48ba6758564bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_liquid_tag.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_liquid_tag.py @@ -4,6 +4,7 @@ from liquid import Environment from liquid.ast import Node from liquid.context import Context +from liquid.filter import string_filter from liquid.parse import expect, get_parser from liquid.stream import TokenStream from liquid.tag import Tag @@ -81,12 +82,18 @@ def parse(self, stream: TokenStream) -> Node: custom_tags = [ConditionTag] +@string_filter +def sql_quote_filter(variable: str) -> str: + return f"'{variable}'" + + @lru_cache(maxsize=1) def _create_env() -> Environment: - env: Environment = Environment() + env: Environment = Environment(strict_filters=False) # register tag. One time activity for custom_tag in custom_tags: env.add_tag(custom_tag) + env.add_filter("sql_quote", sql_quote_filter) return env diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 0eafdb4ad23ba0..197e73dca7141b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -68,6 +68,7 @@ CapabilityReport, MetadataWorkUnitProcessor, Source, + StructuredLogLevel, TestableSource, TestConnectionReport, ) @@ -289,16 +290,12 @@ def make_tableau_client(self, site: str) -> Server: server.auth.sign_in(authentication) return server except ServerResponseError as e: + message = f"Unable to login (invalid/expired credentials or missing permissions): {str(e)}" if isinstance(authentication, PersonalAccessTokenAuth): # Docs on token expiry in Tableau: # https://help.tableau.com/current/server/en-us/security_personal_access_tokens.htm#token-expiry - logger.info( - "Error authenticating with Tableau. Note that Tableau personal access tokens " - "expire if not used for 15 days or if over 1 year old" - ) - raise ValueError( - f"Unable to login (invalid/expired credentials or missing permissions): {str(e)}" - ) from e + message = f"Error authenticating with Tableau. Note that Tableau personal access tokens expire if not used for 15 days or if over 1 year old: {str(e)}" + raise ValueError(message) from e except Exception as e: raise ValueError( f"Unable to login (check your Tableau connection and credentials): {str(e)}" @@ -700,6 +697,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: config=self.config, ctx=self.ctx, site=site, + site_id=site.id, report=self.report, server=self.server, platform=self.platform, @@ -707,11 +705,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: logger.info(f"Ingesting assets of site '{site.content_url}'.") yield from site_source.ingest_tableau_site() else: - site = self.server.sites.get_by_id(self.server.site_id) + site = None + with self.report.report_exc( + title="Unable to fetch site details. Site hierarchy may be incomplete and external urls may be missing.", + message="This usually indicates missing permissions. Ensure that you have all necessary permissions.", + level=StructuredLogLevel.WARN, + ): + site = self.server.sites.get_by_id(self.server.site_id) + site_source = TableauSiteSource( config=self.config, ctx=self.ctx, site=site, + site_id=self.server.site_id, report=self.report, server=self.server, platform=self.platform, @@ -722,6 +728,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: title="Failed to Retrieve Tableau Metadata", message="Unable to retrieve metadata from tableau.", context=str(md_exception), + exc=md_exception, ) def close(self) -> None: @@ -743,7 +750,8 @@ def __init__( self, config: TableauConfig, ctx: PipelineContext, - site: SiteItem, + site: Optional[SiteItem], + site_id: Optional[str], report: TableauSourceReport, server: Server, platform: str, @@ -752,9 +760,16 @@ def __init__( self.report = report self.server: Server = server self.ctx: PipelineContext = ctx - self.site: SiteItem = site self.platform = platform + self.site: Optional[SiteItem] = site + if site_id is not None: + self.site_id: str = site_id + else: + assert self.site is not None, "site or site_id is required" + assert self.site.id is not None, "site_id is required when site is provided" + self.site_id = self.site.id + self.database_tables: Dict[str, DatabaseTable] = {} self.tableau_stat_registry: Dict[str, UsageStat] = {} self.tableau_project_registry: Dict[str, TableauProject] = {} @@ -808,7 +823,7 @@ def dataset_browse_prefix(self) -> str: def _re_authenticate(self): tableau_auth: Union[ TableauAuth, PersonalAccessTokenAuth - ] = self.config.get_tableau_auth(self.site.content_url) + ] = self.config.get_tableau_auth(self.site_id) self.server.auth.sign_in(tableau_auth) @property @@ -826,6 +841,7 @@ def _populate_usage_stat_registry(self) -> None: if not view.id: continue self.tableau_stat_registry[view.id] = UsageStat(view_count=view.total_views) + logger.info(f"Got Tableau stats for {len(self.tableau_stat_registry)} assets") logger.debug("Tableau stats %s", self.tableau_stat_registry) def _populate_database_server_hostname_map(self) -> None: @@ -876,7 +892,7 @@ def form_path(project_id: str) -> List[str]: ancestors = [cur_proj.name] while cur_proj.parent_id is not None: if cur_proj.parent_id not in all_project_map: - self.report.report_warning( + self.report.warning( "project-issue", f"Parent project {cur_proj.parent_id} not found. We need Site Administrator Explorer permissions.", ) @@ -974,8 +990,11 @@ def _init_datasource_registry(self) -> None: self.datasource_project_map[ds.id] = ds.project_id except Exception as e: self.report.get_all_datasources_query_failed = True - logger.info(f"Get all datasources query failed due to error {e}") - logger.debug("Error stack trace", exc_info=True) + self.report.warning( + title="Unexpected Query Error", + message="Get all datasources query failed due to error", + exc=e, + ) def _init_workbook_registry(self) -> None: if self.server is None: @@ -1141,7 +1160,6 @@ def get_connection_object_page( ) if node_limit_errors: - logger.debug(f"Node Limit Error. query_data {query_data}") self.report.warning( title="Tableau Data Exceed Predefined Limit", message="The numbers of record in result set exceeds a predefined limit. Increase the tableau " @@ -1257,9 +1275,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: wrk_id: Optional[str] = workbook.get(c.ID) prj_name: Optional[str] = workbook.get(c.PROJECT_NAME) - logger.debug( - f"Skipping workbook {wrk_name}({wrk_id}) as it is project {prj_name}({project_luid}) not " - f"present in project registry" + self.report.warning( + title="Skipping Missing Workbook", + message="Skipping workbook as its project is not present in project registry", + context=f"workbook={wrk_name}({wrk_id}), project={prj_name}({project_luid})", ) continue @@ -1453,7 +1472,7 @@ def get_upstream_tables( c.COLUMNS_CONNECTION ].get("totalCount") if not is_custom_sql and not num_tbl_cols: - logger.debug( + logger.warning( f"Skipping upstream table with id {table[c.ID]}, no columns: {table}" ) continue @@ -1469,7 +1488,12 @@ def get_upstream_tables( table, default_schema_map=self.config.default_schema_map ) except Exception as e: - logger.info(f"Failed to generate upstream reference for {table}: {e}") + self.report.warning( + title="Potentially Missing Lineage Issue", + message="Failed to generate upstream reference", + exc=e, + context=f"table={table}", + ) continue table_urn = ref.make_dataset_urn( @@ -1917,10 +1941,12 @@ def _query_published_datasource_for_project_luid(self, ds_luid: str) -> None: self.datasource_project_map[ds_result.id] = ds_result.project_id except Exception as e: self.report.num_get_datasource_query_failures += 1 - logger.warning( - f"Failed to get datasource project_luid for {ds_luid} due to error {e}" + self.report.warning( + title="Unexpected Query Error", + message="Failed to get datasource details", + exc=e, + context=f"ds_luid={ds_luid}", ) - logger.debug("Error stack trace", exc_info=True) def _get_workbook_project_luid(self, wb: dict) -> Optional[str]: if wb.get(c.LUID) and self.workbook_project_map.get(wb[c.LUID]): @@ -3181,10 +3207,10 @@ def emit_project_in_topological_order( else: # This is a root Tableau project since the parent_project_id is None. # For a root project, either the site is the parent, or the platform is the default parent. - if self.config.add_site_container and self.site and self.site.id: + if self.config.add_site_container: # The site containers have already been generated by emit_site_container, so we # don't need to emit them again here. - parent_project_key = self.gen_site_key(self.site.id) + parent_project_key = self.gen_site_key(self.site_id) yield from gen_containers( container_key=project_key, @@ -3201,12 +3227,12 @@ def emit_project_in_topological_order( yield from emit_project_in_topological_order(project) def emit_site_container(self): - if not self.site or not self.site.id: + if not self.site: logger.warning("Can not ingest site container. No site information found.") return yield from gen_containers( - container_key=self.gen_site_key(self.site.id), + container_key=self.gen_site_key(self.site_id), name=self.site.name or "Default", sub_types=[c.SITE], ) diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index ab55321a4d7342..4cd2777dc7dcad 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -889,7 +889,7 @@ def test_view_to_view_lineage_and_liquid_template(pytestconfig, tmp_path, mock_t @freeze_time(FROZEN_TIME) def test_special_liquid_variables(): - text: str = """ + text: str = """{% assign source_table_variable = "source_table" | sql_quote | non_existing_filter_where_it_should_not_fail %} SELECT employee_id, employee_name, @@ -903,7 +903,7 @@ def test_special_liquid_variables(): 'default_table' as source {% endif %}, employee_income - FROM source_table + FROM {{ source_table_variable }} """ input_liquid_variable: dict = {} @@ -958,7 +958,7 @@ def test_special_liquid_variables(): expected_text: str = ( "\n SELECT\n employee_id,\n employee_name,\n \n " "prod_core.data.r_metric_summary_v2\n ,\n employee_income\n FROM " - "source_table\n " + "'source_table'\n " ) assert actual_text == expected_text diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 6c45b8a47de412..38a53b323876d1 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -1028,6 +1028,7 @@ def check_lineage_metadata( ctx=context, platform="tableau", site=SiteItem(name="Site 1", content_url="site1"), + site_id="site1", report=TableauSourceReport(), server=Server("https://test-tableau-server.com"), ) @@ -1248,6 +1249,7 @@ def test_permission_mode_switched_error(pytestconfig, tmp_path, mock_datahub_gra config=mock.MagicMock(), ctx=mock.MagicMock(), site=mock.MagicMock(), + site_id=None, server=mock_sdk.return_value, report=reporter, ) diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index af71227809d2a7..2535d091f6ce52 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -62,7 +62,7 @@ compileJava.dependsOn copyAvroSchemas // Add Python environment validation task -task validatePythonEnv { +task validatePythonEnv(dependsOn: [":metadata-ingestion:installDev"]) { doFirst { def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv') def isWindows = System.getProperty('os.name').toLowerCase().contains('windows') diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java index a2c9a15d92f90a..2ab6a50945ba37 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java @@ -79,11 +79,19 @@ public void testUrnWithIllegalDelimiter() { } @Test(expectedExceptions = IllegalArgumentException.class) - public void testComplexUrnWithParens() { + public void testComplexUrnWithParens1() { Urn invalidUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,(illegal),PROD)"); ValidationApiUtils.validateUrn(entityRegistry, invalidUrn, true); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testComplexUrnWithParens2() { + Urn invalidUrn = + UrnUtils.getUrn( + "urn:li:dataJob:(urn:li:dataFlow:(mssql,1/2/3/4.c_n on %28LOCAL%29,PROD),1/2/3/4.c_n on (LOCAL))"); + ValidationApiUtils.validateUrn(entityRegistry, invalidUrn, true); + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testSimpleUrnWithParens() { Urn invalidUrn = UrnUtils.getUrn("urn:li:corpuser:(foo)123"); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java index 504eb5f5fc13db..fd663de40e0050 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java @@ -1367,8 +1367,8 @@ public void testScrollAcrossEntities() throws IOException { resultUrns.addAll(result.getEntities().stream().map(SearchEntity::getEntity).toList()); scrollId = result.getScrollId(); } while (scrollId != null); - // expect 2 total matching results - assertEquals(totalResults, 2, String.format("query `%s` Results: %s", query, resultUrns)); + // expect 8 total matching results + assertEquals(totalResults, 8, String.format("query `%s` Results: %s", query, resultUrns)); } @Test @@ -1745,7 +1745,7 @@ public void testOr() { String.format("%s - Expected search results to include matched fields", query)); assertEquals( result.getEntities().size(), - 2, + 8, String.format( "Query: `%s` Results: %s", query, @@ -1776,7 +1776,7 @@ public void testNegate() { String.format("%s - Expected search results to include matched fields", query)); assertEquals( result.getEntities().size(), - 2, + 8, String.format( "Query: `%s` Results: %s", query, diff --git a/metadata-io/src/test/resources/search_config_fixture_test.yml b/metadata-io/src/test/resources/search_config_fixture_test.yml index 08e713c6b1cd38..e3c97c267188fb 100644 --- a/metadata-io/src/test/resources/search_config_fixture_test.yml +++ b/metadata-io/src/test/resources/search_config_fixture_test.yml @@ -57,9 +57,9 @@ queryConfigurations: boost_mode: replace # Criteria for exact-match only - # Contains quotes, is a single term with `_`, `.`, or `-` (normally consider for tokenization) then use exact match query + # Contains quotes - queryRegex: >- - ^["'].+["']$|^[a-zA-Z0-9]\S+[_.-]\S+[a-zA-Z0-9]$ + ^["'].+["']$ simpleQuery: false prefixMatchQuery: true exactMatchQuery: true diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml index 0e283dfdfc93ca..a81cf39ce386ff 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml @@ -13,7 +13,7 @@ bootstrap: mcps_location: "bootstrap_mcps/root-user.yaml" - name: data-platforms - version: v1 + version: v2 blocking: true async: false mcps_location: "bootstrap_mcps/data-platforms.yaml" diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 0b3d815c710980..2230d552ed4c0e 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -119,6 +119,16 @@ displayName: Hive type: FILE_SYSTEM logoUrl: "/assets/platforms/hivelogo.png" +- entityUrn: urn:li:dataPlatform:hudi + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: hudi + displayName: Hudi + type: FILE_SYSTEM + logoUrl: "/assets/platforms/hudilogo.png" - entityUrn: urn:li:dataPlatform:iceberg entityType: dataPlatform aspectName: dataPlatformInfo