Skip to content

Commit

Permalink
custom dependency detector removal (apache#41609)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored Aug 20, 2024
1 parent 1b602d5 commit c879501
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 78 deletions.
27 changes: 1 addition & 26 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import enum
import inspect
import logging
import warnings
import weakref
from inspect import signature
from textwrap import dedent
Expand All @@ -37,7 +36,6 @@
from airflow import macros
from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.datasets import (
BaseDataset,
Dataset,
Expand All @@ -46,7 +44,7 @@
DatasetAny,
_DatasetAliasCondition,
)
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, SerializationError, TaskDeferred
from airflow.exceptions import AirflowException, SerializationError, TaskDeferred
from airflow.jobs.job import Job
from airflow.models import Trigger
from airflow.models.baseoperator import BaseOperator
Expand Down Expand Up @@ -1425,31 +1423,8 @@ def deserialize_operator(cls, encoded_op: dict[str, Any]) -> Operator:
@classmethod
def detect_dependencies(cls, op: Operator) -> set[DagDependency]:
"""Detect between DAG dependencies for the operator."""

def get_custom_dep() -> list[DagDependency]:
"""
If custom dependency detector is configured, use it.
TODO: Remove this logic in 3.0.
"""
custom_dependency_detector_cls = conf.getimport("scheduler", "dependency_detector", fallback=None)
if not (
custom_dependency_detector_cls is None or custom_dependency_detector_cls is DependencyDetector
):
warnings.warn(
"Use of a custom dependency detector is deprecated. "
"Support will be removed in a future release.",
RemovedInAirflow3Warning,
stacklevel=1,
)
dep = custom_dependency_detector_cls().detect_task_dependencies(op)
if type(dep) is DagDependency:
return [dep]
return []

dependency_detector = DependencyDetector()
deps = set(dependency_detector.detect_task_dependencies(op))
deps.update(get_custom_dep()) # todo: remove in 3.0
return deps

@classmethod
Expand Down
1 change: 1 addition & 0 deletions newsfragments/41609.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed deprecated ``dependency_detector`` parameter from ``scheduler``.
52 changes: 0 additions & 52 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.compat import BaseOperatorLink
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_operators import AirflowLink2, CustomOperator, GoogleLink, MockOperator
from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable

Expand Down Expand Up @@ -1691,57 +1690,6 @@ class DerivedSensor(ExternalTaskSensor):
}
]

@pytest.mark.db_test
@conf_vars(
{
(
"scheduler",
"dependency_detector",
): "tests.serialization.test_dag_serialization.CustomDependencyDetector"
}
)
def test_custom_dep_detector(self):
"""
Prior to deprecation of custom dependency detector, the return type was DagDependency | None.
This class verifies that custom dependency detector classes which assume that return type will still
work until support for them is removed in 3.0.
TODO: remove in Airflow 3.0
"""
from airflow.sensors.external_task import ExternalTaskSensor

execution_date = datetime(2020, 1, 1)
with DAG(dag_id="test", schedule=None, start_date=execution_date) as dag:
ExternalTaskSensor(
task_id="task1",
external_dag_id="external_dag_id",
mode="reschedule",
)
CustomDepOperator(task_id="hello", bash_command="hi")
with pytest.warns(
RemovedInAirflow3Warning,
match=r"Use of a custom dependency detector is deprecated\. "
r"Support will be removed in a future release\.",
):
dag = SerializedDAG.to_dict(dag)
assert sorted(dag["dag"]["dag_dependencies"], key=lambda x: tuple(x.values())) == sorted(
[
{
"source": "external_dag_id",
"target": "test",
"dependency_type": "sensor",
"dependency_id": "task1",
},
{
"source": "test",
"target": "nothing",
"dependency_type": "abc",
"dependency_id": "hello",
},
],
key=lambda x: tuple(x.values()),
)

@pytest.mark.db_test
def test_dag_deps_datasets_with_duplicate_dataset(self):
"""
Expand Down

0 comments on commit c879501

Please sign in to comment.