diff --git a/cosmos/dataset.py b/cosmos/dataset.py index 2a308c54e..abdc80e88 100644 --- a/cosmos/dataset.py +++ b/cosmos/dataset.py @@ -17,7 +17,7 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i dag_id = task_group.dag_id if task_group.group_id is not None: task_group_id = task_group.group_id - task_group_id = task_group_id.replace(".", "__") + task_group_id = task_group_id.split(".")[-1] elif dag: dag_id = dag.dag_id @@ -28,6 +28,6 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i if task_group_id: identifiers_list.append(task_group_id) - identifiers_list.append(task_id) + identifiers_list.append(task_id.split(".")[-1]) return "__".join(identifiers_list) diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index b5d2c1821..9cc138e6f 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -64,6 +64,71 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_ In this scenario, ``project_one`` runs once a day and ``project_two`` runs immediately after ``project_one``. You can view these dependencies in Airflow's UI. + +Examples +................. + +This example DAG: + +.. + The following renders in Sphinx but not Github: + +.. literalinclude:: ./../dev/dags/basic_cosmos_dag.py + :language: python + :start-after: [START local_example] + :end-before: [END local_example] + + +Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer): + +.. code-block:: python + + from datetime import datetime + from airflow import DAG + from airflow.datasets import Dataset + from airflow.operators.empty import EmptyOperator + + + with DAG( + "dataset_triggered_dag", + description="A DAG that should be triggered via Dataset", + start_date=datetime(2024, 9, 1), + schedule=[Dataset(uri="postgres://0.0.0.0:5434/postgres.public.orders")], + ) as dag: + t1 = EmptyOperator( + task_id="task_1", + ) + t2 = EmptyOperator( + task_id="task_2", + ) + + t1 >> t2 + + +From Cosmos 1.7 and Airflow 2.10, it is also possible to trigger DAGs be to be run by using ``DatasetAliases``: + +.. code-block:: python + + from datetime import datetime + from airflow import DAG + from airflow.datasets import DatasetAlias + from airflow.operators.empty import EmptyOperator + + + with DAG( + "datasetalias_triggered_dag", + description="A DAG that should be triggered via Dataset alias", + start_date=datetime(2024, 9, 1), + schedule=[DatasetAlias(name="basic_cosmos_dag__orders__run")], + ) as dag: + + t3 = EmptyOperator( + task_id="task_3", + ) + + t3 + + Known Limitations ................. diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 12e423323..a7eebd409 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -11,16 +11,17 @@ @pytest.mark.parametrize( - "dag, task_group, result_identifier", + "dag, task_group,task_id,result_identifier", [ - (example_dag, None, "dag__task_id"), - (None, TaskGroup(dag=example_dag, group_id="inner_tg"), "dag__inner_tg__task_id"), + (example_dag, None, "task_id", "dag__task_id"), + (None, TaskGroup(dag=example_dag, group_id="inner_tg"), "task_id", "dag__inner_tg__task_id"), ( None, TaskGroup( dag=example_dag, group_id="child_tg", parent_group=TaskGroup(dag=example_dag, group_id="parent_tg") ), - "dag__parent_tg__child_tg__task_id", + "task_id", + "dag__child_tg__task_id", ), ( None, @@ -31,9 +32,19 @@ dag=example_dag, group_id="mum_tg", parent_group=TaskGroup(dag=example_dag, group_id="nana_tg") ), ), - "dag__nana_tg__mum_tg__child_tg__task_id", + "task_id", + "dag__child_tg__task_id", + ), + ( + None, + TaskGroup( + dag=example_dag, + group_id="another_tg", + ), + "another_tg.task_id", # Airflow injects this during task execution time when outside of standalone + "dag__another_tg__task_id", ), ], ) -def test_get_dataset_alias_name(dag, task_group, result_identifier): - assert get_dataset_alias_name(dag, task_group, "task_id") == result_identifier +def test_get_dataset_alias_name(dag, task_group, task_id, result_identifier): + assert get_dataset_alias_name(dag, task_group, task_id) == result_identifier diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 6c7e98802..2a66faa49 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,5 +1,6 @@ from __future__ import annotations +import warnings from pathlib import Path try: @@ -10,6 +11,7 @@ import airflow import pytest +import sqlalchemy from airflow.models.dagbag import DagBag from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session @@ -106,6 +108,24 @@ def test_example_dag(session, dag_id: str): # This feature is available since Airflow 2.5 and we've backported it in Cosmos: # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 if AIRFLOW_VERSION >= Version("2.5"): - dag.test() + if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")): + dag.test() + else: + # This is a work around until we fix the issue in Airflow: + # https://github.com/apache/airflow/issues/42495 + """ + FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError: + This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). + Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a) + FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag] + FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping] + FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] + """ + try: + dag.test() + except sqlalchemy.exc.PendingRollbackError: + warnings.warn( + "Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets" + ) else: test_utils.run_dag(dag)