Skip to content

Commit

Permalink
Makes sure to forward any Airflow Exception's during state checking, …
Browse files Browse the repository at this point in the history
…else they are ignored and that we do not want. (#43)

* Makes sure to forward any AirflowException's during state checking, else they are ignored and that we do not want.

* Timeout changed back

* DCO Remediation Commit for Torben Juul Johansson <[email protected]>

I, Torben Juul Johansson <[email protected]>, hereby add my Signed-off-by to this commit: 9a6610f
I, Torben Juul Johansson <[email protected]>, hereby add my Signed-off-by to this commit: b82a1f1

Signed-off-by: Torben Juul Johansson <[email protected]>

---------

Signed-off-by: Torben Juul Johansson <[email protected]>
  • Loading branch information
torbenjuul authored Dec 2, 2024
1 parent 0e0612c commit c963886
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sas-airflow-provider
version = 0.0.18
version = 0.0.19
author = SAS
author_email = [email protected]
description = Enables execution of Studio Flows and Jobs from Airflow
Expand Down
11 changes: 8 additions & 3 deletions src/sas_airflow_provider/operators/sas_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from airflow.exceptions import AirflowFailException
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowTaskTimeout
from airflow.models import BaseOperator
from sas_airflow_provider.hooks.sas import SasHook
from sas_airflow_provider.util.util import stream_log, create_or_connect_to_session, end_compute_session
Expand Down Expand Up @@ -379,9 +380,13 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo
num_log_lines=stream_log(self.connection, job, num_log_lines, http_timeout=self.http_timeout)

except Exception as e:
countUnknownState = countUnknownState + 1
self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...')
state = "unknown"
# We makes sure to forward any AirflowException's encountered during state checking, else continue checking.
if isinstance(e,AirflowTaskTimeout) or isinstance(e,AirflowException):
raise
else:
countUnknownState = countUnknownState + 1
self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...')
state = "unknown"

if state == 'unknown':
# Raise AirflowFailException as we don't know if the job is still running
Expand Down
9 changes: 8 additions & 1 deletion src/sas_airflow_provider/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import os
import logging

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowTaskTimeout


def get_folder_file_contents(session, path: str, http_timeout=None) -> str:
"""
Expand Down Expand Up @@ -138,7 +141,11 @@ def stream_log(session,job,start,limit=99999, http_timeout=None) -> int:
else:
logging.getLogger(name=None).warning(f"Failed to retrieve parts of the log with status code {r.status_code} from URI: {log_uri}/content. Maybe the log is too large.")
except Exception as e:
logging.getLogger(name=None).warning(f"Unable to retrieve parts of the log: {e}. Maybe the log is too large.")
# Makes sure to forward any AirflowException's encountered during log retrieval
if isinstance(e,AirflowTaskTimeout) or isinstance(e,AirflowException):
raise
else:
logging.getLogger(name=None).warning(f"Unable to retrieve parts of the log: {e}. Maybe the log is too large.")

return current_line

Expand Down

0 comments on commit c963886

Please sign in to comment.