From c87950173a91723947b459eb5a1ec0da0abb08c1 Mon Sep 17 00:00:00 2001 From: Gopal Dirisala <39794726+dirrao@users.noreply.github.com> Date: Tue, 20 Aug 2024 19:50:50 +0530 Subject: [PATCH] custom dependency detector removal (#41609) --- airflow/serialization/serialized_objects.py | 27 +--------- newsfragments/41609.significant.rst | 1 + tests/serialization/test_dag_serialization.py | 52 ------------------- 3 files changed, 2 insertions(+), 78 deletions(-) create mode 100644 newsfragments/41609.significant.rst diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 4004b83a991bd..9adb9f7334ba0 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -23,7 +23,6 @@ import enum import inspect import logging -import warnings import weakref from inspect import signature from textwrap import dedent @@ -37,7 +36,6 @@ from airflow import macros from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest from airflow.compat.functools import cache -from airflow.configuration import conf from airflow.datasets import ( BaseDataset, Dataset, @@ -46,7 +44,7 @@ DatasetAny, _DatasetAliasCondition, ) -from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, SerializationError, TaskDeferred +from airflow.exceptions import AirflowException, SerializationError, TaskDeferred from airflow.jobs.job import Job from airflow.models import Trigger from airflow.models.baseoperator import BaseOperator @@ -1425,31 +1423,8 @@ def deserialize_operator(cls, encoded_op: dict[str, Any]) -> Operator: @classmethod def detect_dependencies(cls, op: Operator) -> set[DagDependency]: """Detect between DAG dependencies for the operator.""" - - def get_custom_dep() -> list[DagDependency]: - """ - If custom dependency detector is configured, use it. - - TODO: Remove this logic in 3.0. - """ - custom_dependency_detector_cls = conf.getimport("scheduler", "dependency_detector", fallback=None) - if not ( - custom_dependency_detector_cls is None or custom_dependency_detector_cls is DependencyDetector - ): - warnings.warn( - "Use of a custom dependency detector is deprecated. " - "Support will be removed in a future release.", - RemovedInAirflow3Warning, - stacklevel=1, - ) - dep = custom_dependency_detector_cls().detect_task_dependencies(op) - if type(dep) is DagDependency: - return [dep] - return [] - dependency_detector = DependencyDetector() deps = set(dependency_detector.detect_task_dependencies(op)) - deps.update(get_custom_dep()) # todo: remove in 3.0 return deps @classmethod diff --git a/newsfragments/41609.significant.rst b/newsfragments/41609.significant.rst new file mode 100644 index 0000000000000..dff7b26a55596 --- /dev/null +++ b/newsfragments/41609.significant.rst @@ -0,0 +1 @@ +Removed deprecated ``dependency_detector`` parameter from ``scheduler``. diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 5e45351f3dbeb..3ed2bdcbfae8c 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -78,7 +78,6 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.xcom import XCOM_RETURN_KEY from tests.test_utils.compat import BaseOperatorLink -from tests.test_utils.config import conf_vars from tests.test_utils.mock_operators import AirflowLink2, CustomOperator, GoogleLink, MockOperator from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable @@ -1691,57 +1690,6 @@ class DerivedSensor(ExternalTaskSensor): } ] - @pytest.mark.db_test - @conf_vars( - { - ( - "scheduler", - "dependency_detector", - ): "tests.serialization.test_dag_serialization.CustomDependencyDetector" - } - ) - def test_custom_dep_detector(self): - """ - Prior to deprecation of custom dependency detector, the return type was DagDependency | None. - This class verifies that custom dependency detector classes which assume that return type will still - work until support for them is removed in 3.0. - - TODO: remove in Airflow 3.0 - """ - from airflow.sensors.external_task import ExternalTaskSensor - - execution_date = datetime(2020, 1, 1) - with DAG(dag_id="test", schedule=None, start_date=execution_date) as dag: - ExternalTaskSensor( - task_id="task1", - external_dag_id="external_dag_id", - mode="reschedule", - ) - CustomDepOperator(task_id="hello", bash_command="hi") - with pytest.warns( - RemovedInAirflow3Warning, - match=r"Use of a custom dependency detector is deprecated\. " - r"Support will be removed in a future release\.", - ): - dag = SerializedDAG.to_dict(dag) - assert sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) == sorted( - [ - { - "source": "external_dag_id", - "target": "test", - "dependency_type": "sensor", - "dependency_id": "task1", - }, - { - "source": "test", - "target": "nothing", - "dependency_type": "abc", - "dependency_id": "hello", - }, - ], - key=lambda x: tuple(x.values()), - ) - @pytest.mark.db_test def test_dag_deps_datasets_with_duplicate_dataset(self): """