From d58c2f52325870484cae8a21ce9435c6d47f598d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 4 Oct 2024 11:02:18 +0100 Subject: [PATCH] Fix Dataset and DatasetAlias scheduling in Airflow 2.10 (#1240) When tasks inside task groups were emitting events, they were different during task initialization and task execution, resulting in different DatasetAlias representations and scheduling not working as expected. During task execution, the task ID is modified also to contain the task_group. We could not catch this before due to inconsistent behaviour between running Airflow using `dags.test()`, `airflow standalone` and `astro-cli`. --- cosmos/dataset.py | 4 +- docs/configuration/scheduling.rst | 65 +++++++++++++++++++++++++++++++ tests/test_dataset.py | 25 ++++++++---- tests/test_example_dags.py | 22 ++++++++++- 4 files changed, 106 insertions(+), 10 deletions(-) diff --git a/cosmos/dataset.py b/cosmos/dataset.py index 2a308c54e..abdc80e88 100644 --- a/cosmos/dataset.py +++ b/cosmos/dataset.py @@ -17,7 +17,7 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i dag_id = task_group.dag_id if task_group.group_id is not None: task_group_id = task_group.group_id - task_group_id = task_group_id.replace(".", "__") + task_group_id = task_group_id.split(".")[-1] elif dag: dag_id = dag.dag_id @@ -28,6 +28,6 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i if task_group_id: identifiers_list.append(task_group_id) - identifiers_list.append(task_id) + identifiers_list.append(task_id.split(".")[-1]) return "__".join(identifiers_list) diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index b5d2c1821..9cc138e6f 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -64,6 +64,71 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_ In this scenario, ``project_one`` runs once a day and ``project_two`` runs immediately after ``project_one``. You can view these dependencies in Airflow's UI. + +Examples +................. + +This example DAG: + +.. + The following renders in Sphinx but not Github: + +.. literalinclude:: ./../dev/dags/basic_cosmos_dag.py + :language: python + :start-after: [START local_example] + :end-before: [END local_example] + + +Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer): + +.. code-block:: python + + from datetime import datetime + from airflow import DAG + from airflow.datasets import Dataset + from airflow.operators.empty import EmptyOperator + + + with DAG( + "dataset_triggered_dag", + description="A DAG that should be triggered via Dataset", + start_date=datetime(2024, 9, 1), + schedule=[Dataset(uri="postgres://0.0.0.0:5434/postgres.public.orders")], + ) as dag: + t1 = EmptyOperator( + task_id="task_1", + ) + t2 = EmptyOperator( + task_id="task_2", + ) + + t1 >> t2 + + +From Cosmos 1.7 and Airflow 2.10, it is also possible to trigger DAGs be to be run by using ``DatasetAliases``: + +.. code-block:: python + + from datetime import datetime + from airflow import DAG + from airflow.datasets import DatasetAlias + from airflow.operators.empty import EmptyOperator + + + with DAG( + "datasetalias_triggered_dag", + description="A DAG that should be triggered via Dataset alias", + start_date=datetime(2024, 9, 1), + schedule=[DatasetAlias(name="basic_cosmos_dag__orders__run")], + ) as dag: + + t3 = EmptyOperator( + task_id="task_3", + ) + + t3 + + Known Limitations ................. diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 12e423323..a7eebd409 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -11,16 +11,17 @@ @pytest.mark.parametrize( - "dag, task_group, result_identifier", + "dag, task_group,task_id,result_identifier", [ - (example_dag, None, "dag__task_id"), - (None, TaskGroup(dag=example_dag, group_id="inner_tg"), "dag__inner_tg__task_id"), + (example_dag, None, "task_id", "dag__task_id"), + (None, TaskGroup(dag=example_dag, group_id="inner_tg"), "task_id", "dag__inner_tg__task_id"), ( None, TaskGroup( dag=example_dag, group_id="child_tg", parent_group=TaskGroup(dag=example_dag, group_id="parent_tg") ), - "dag__parent_tg__child_tg__task_id", + "task_id", + "dag__child_tg__task_id", ), ( None, @@ -31,9 +32,19 @@ dag=example_dag, group_id="mum_tg", parent_group=TaskGroup(dag=example_dag, group_id="nana_tg") ), ), - "dag__nana_tg__mum_tg__child_tg__task_id", + "task_id", + "dag__child_tg__task_id", + ), + ( + None, + TaskGroup( + dag=example_dag, + group_id="another_tg", + ), + "another_tg.task_id", # Airflow injects this during task execution time when outside of standalone + "dag__another_tg__task_id", ), ], ) -def test_get_dataset_alias_name(dag, task_group, result_identifier): - assert get_dataset_alias_name(dag, task_group, "task_id") == result_identifier +def test_get_dataset_alias_name(dag, task_group, task_id, result_identifier): + assert get_dataset_alias_name(dag, task_group, task_id) == result_identifier diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 6c7e98802..2a66faa49 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,5 +1,6 @@ from __future__ import annotations +import warnings from pathlib import Path try: @@ -10,6 +11,7 @@ import airflow import pytest +import sqlalchemy from airflow.models.dagbag import DagBag from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session @@ -106,6 +108,24 @@ def test_example_dag(session, dag_id: str): # This feature is available since Airflow 2.5 and we've backported it in Cosmos: # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 if AIRFLOW_VERSION >= Version("2.5"): - dag.test() + if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")): + dag.test() + else: + # This is a work around until we fix the issue in Airflow: + # https://github.com/apache/airflow/issues/42495 + """ + FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError: + This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). + Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a) + FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag] + FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping] + FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] + """ + try: + dag.test() + except sqlalchemy.exc.PendingRollbackError: + warnings.warn( + "Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets" + ) else: test_utils.run_dag(dag)