Skip to content

Commit

Permalink
fix (#113)
Browse files Browse the repository at this point in the history
Co-authored-by: Maxim Mityutko <[email protected]>
  • Loading branch information
maxim-mityutko and Maxim Mityutko authored May 8, 2024
1 parent 2b70616 commit 144de7a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
14 changes: 9 additions & 5 deletions brickflow_plugins/airflow/operators/external_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,24 @@ def poke(self, context):
else:
status = response.json()["status"][:2].upper()

last_end_timestamp = parse(response.json()["lastEndUTC"]).replace(
tzinfo=pytz.UTC
)
last_end_timestamp = None
if last_end_utc := response.json().get("lastEndUTC"):
last_end_timestamp = parse(last_end_utc).replace(tzinfo=pytz.UTC)

time_delta = (
self.time_delta
if isinstance(self.time_delta, timedelta)
else timedelta(**self.time_delta)
)

execution_timestamp = parse(str(context["execution_date"]))
execution_timestamp = parse(context["execution_date"])
run_timestamp = execution_timestamp - time_delta

if "SU" in status and last_end_timestamp >= run_timestamp:
if (
"SU" in status
and last_end_timestamp
and last_end_timestamp >= run_timestamp
):
logging.info(
f"Last End: {last_end_timestamp}, Run Timestamp: {run_timestamp}"
)
Expand Down
72 changes: 72 additions & 0 deletions tests/airflow_plugins/test_autosys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest
from requests.exceptions import HTTPError
from requests_mock.mocker import Mocker as RequestsMocker

from brickflow_plugins.airflow.operators.external_tasks import AutosysSensor


class TestAutosysSensor:
@pytest.fixture(autouse=True, name="api", scope="class")
def mock_api(self):
rm = RequestsMocker()
rm.register_uri(
method="GET",
url="https://42.autosys.my-org.com/foo",
response_list=[
# Test 1: Success
{
"json": {"status": "SU", "lastEndUTC": "2024-01-01T00:55:00Z"},
"status_code": int(200),
},
# Test 2: Raise Error
{
"json": {},
"status_code": int(404),
},
# Test 3: Poke 4 times until success
{
"json": {"status": "FA", "lastEndUTC": "2024-01-01T00:55:00Z"},
"status_code": int(200),
},
{
"json": {"status": "UNK", "lastEndUTC": None},
"status_code": int(200),
},
{
"json": {"status": "UNK", "lastEndUTC": ""},
"status_code": int(200),
},
{
"json": {"status": "SU", "lastEndUTC": "2024-01-01T01:55:00Z"},
"status_code": int(200),
},
],
)
yield rm

@pytest.fixture()
def sensor(self):
yield AutosysSensor(
task_id="test",
url="https://42.autosys.my-org.com/",
job_name="foo",
poke_interval=1,
time_delta={"hours": 1},
)

def test_success(self, api, caplog, sensor):
with api:
sensor.poke(context={"execution_date": "2024-01-01T01:00:00Z"})
assert caplog.text.count("Poking again") == 0
assert "Success criteria met. Exiting" in caplog.text

def test_non_200(self, api, sensor):
with pytest.raises(HTTPError):
with api:
sensor.poke(context={"execution_date": "2024-01-01T01:00:00Z"})

def test_poking(self, api, caplog, sensor):
with api:
sensor.poke(context={"execution_date": "2024-01-01T02:00:00Z"})
assert caplog.text.count("Poking again") == 3
assert "Success criteria met. Exiting" in caplog.text

0 comments on commit 144de7a

Please sign in to comment.