Skip to content

Commit

Permalink
fix(ingest/airflow): compat with pluggy 1.0 (datahub-project#9365)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 5, 2023
1 parent 7517c77 commit 0d9aa26
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 9 deletions.
14 changes: 14 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ If your URLs aren't being generated correctly (usually they'll start with `http:
base_url = http://airflow.mycorp.example.com
```

### TypeError ... missing 3 required positional arguments

If you see errors like the following with the v2 plugin:

```shell
ERROR - on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session'
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_listener.py", line 124, in wrapper
f(*args, **kwargs)
TypeError: on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session'
```

The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details.

## Compatibility

We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import airflow.version
import packaging.version
import pluggy
from airflow.models.baseoperator import BaseOperator

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
Expand All @@ -27,9 +28,13 @@

# Approach suggested by https://stackoverflow.com/a/11887885/5004662.
AIRFLOW_VERSION = packaging.version.parse(airflow.version.version)
PLUGGY_VERSION = packaging.version.parse(pluggy.__version__)
HAS_AIRFLOW_STANDALONE_CMD = AIRFLOW_VERSION >= packaging.version.parse("2.2.0.dev0")
HAS_AIRFLOW_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.3.0.dev0")
HAS_AIRFLOW_DAG_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.5.0.dev0")
NEEDS_AIRFLOW_LISTENER_MODULE = AIRFLOW_VERSION < packaging.version.parse(
"2.5.0.dev0"
) or PLUGGY_VERSION <= packaging.version.parse("1.0.0")


def get_task_inlets(operator: "Operator") -> List:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
from datahub_airflow_plugin.datahub_listener import get_airflow_plugin_listener
from datahub_airflow_plugin.datahub_listener import (
get_airflow_plugin_listener,
hookimpl,
)

_listener = get_airflow_plugin_listener()
if _listener:
on_task_instance_running = _listener.on_task_instance_running
on_task_instance_success = _listener.on_task_instance_success
on_task_instance_failed = _listener.on_task_instance_failed
# The run_in_thread decorator messes with pluggy's interface discovery,
# which causes the hooks to be called with no arguments and results in TypeErrors.
# This is only an issue with Pluggy <= 1.0.0.
# See https://github.com/pytest-dev/pluggy/issues/358
# Note that pluggy 1.0.0 is in the constraints file for Airflow 2.4 and 2.5.

@hookimpl
def on_task_instance_running(previous_state, task_instance, session):
assert _listener
_listener.on_task_instance_running(previous_state, task_instance, session)

@hookimpl
def on_task_instance_success(previous_state, task_instance, session):
assert _listener
_listener.on_task_instance_success(previous_state, task_instance, session)

@hookimpl
def on_task_instance_failed(previous_state, task_instance, session):
assert _listener
_listener.on_task_instance_failed(previous_state, task_instance, session)

if hasattr(_listener, "on_dag_run_running"):

@hookimpl
def on_dag_run_running(dag_run, session):
assert _listener
_listener.on_dag_run_running(dag_run, session)
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
from datahub_airflow_plugin._airflow_shims import (
HAS_AIRFLOW_DAG_LISTENER_API,
HAS_AIRFLOW_LISTENER_API,
NEEDS_AIRFLOW_LISTENER_MODULE,
)

assert AIRFLOW_PATCHED
Expand Down Expand Up @@ -50,7 +50,7 @@ class DatahubPlugin(AirflowPlugin):
name = "datahub_plugin"

if _USE_AIRFLOW_LISTENER_INTERFACE:
if HAS_AIRFLOW_DAG_LISTENER_API:
if not NEEDS_AIRFLOW_LISTENER_MODULE:
from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc]
get_airflow_plugin_listener,
)
Expand All @@ -60,8 +60,6 @@ class DatahubPlugin(AirflowPlugin):
else:
# On Airflow < 2.5, we need the listener to be a module.
# This is just a quick shim layer to make that work.
# The DAG listener API was added at the same time as this method
# was fixed, so we're reusing the same check variable.
#
# Related Airflow change: https://github.com/apache/airflow/pull/27113.
import datahub_airflow_plugin._datahub_listener_module as _listener_module # type: ignore[misc]
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion-modules/airflow-plugin/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ deps =
# Airflow version
airflow21: apache-airflow~=2.1.0
airflow22: apache-airflow~=2.2.0
airflow24: apache-airflow~=2.4.0
# On Airflow 2.4 and 2.5, Airflow's constraints file pins pluggy to 1.0.0,
# which has caused issues for us before. As such, we now pin it explicitly
# to prevent regressions.
# See https://github.com/datahub-project/datahub/pull/9365
airflow24: apache-airflow~=2.4.0,pluggy==1.0.0
airflow26: apache-airflow~=2.6.0
airflow27: apache-airflow~=2.7.0
commands =
Expand Down

0 comments on commit 0d9aa26

Please sign in to comment.