From c96388649719bd780f8bd8a7c2054535956d21a8 Mon Sep 17 00:00:00 2001 From: Torben Juul Johansson Date: Mon, 2 Dec 2024 14:04:28 +0100 Subject: [PATCH] Makes sure to forward any Airflow Exception's during state checking, else they are ignored and that we do not want. (#43) * Makes sure to forward any AirflowException's during state checking, else they are ignored and that we do not want. * Timeout changed back * DCO Remediation Commit for Torben Juul Johansson I, Torben Juul Johansson , hereby add my Signed-off-by to this commit: 9a6610fb6a19a4ae6107f25008144f624f1485d7 I, Torben Juul Johansson , hereby add my Signed-off-by to this commit: b82a1f1e150212545718e165e38e25e0e608fdf0 Signed-off-by: Torben Juul Johansson --------- Signed-off-by: Torben Juul Johansson --- setup.cfg | 2 +- src/sas_airflow_provider/operators/sas_studio.py | 11 ++++++++--- src/sas_airflow_provider/util/util.py | 9 ++++++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/setup.cfg b/setup.cfg index 917784d..cca74b0 100755 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = sas-airflow-provider -version = 0.0.18 +version = 0.0.19 author = SAS author_email = andrew.shakinovsky@sas.com description = Enables execution of Studio Flows and Jobs from Airflow diff --git a/src/sas_airflow_provider/operators/sas_studio.py b/src/sas_airflow_provider/operators/sas_studio.py index af6683e..7d83d79 100755 --- a/src/sas_airflow_provider/operators/sas_studio.py +++ b/src/sas_airflow_provider/operators/sas_studio.py @@ -22,6 +22,7 @@ from airflow.exceptions import AirflowFailException from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowTaskTimeout from airflow.models import BaseOperator from sas_airflow_provider.hooks.sas import SasHook from sas_airflow_provider.util.util import stream_log, create_or_connect_to_session, end_compute_session @@ -379,9 +380,13 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo num_log_lines=stream_log(self.connection, job, num_log_lines, http_timeout=self.http_timeout) except Exception as e: - countUnknownState = countUnknownState + 1 - self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...') - state = "unknown" + # We makes sure to forward any AirflowException's encountered during state checking, else continue checking. + if isinstance(e,AirflowTaskTimeout) or isinstance(e,AirflowException): + raise + else: + countUnknownState = countUnknownState + 1 + self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...') + state = "unknown" if state == 'unknown': # Raise AirflowFailException as we don't know if the job is still running diff --git a/src/sas_airflow_provider/util/util.py b/src/sas_airflow_provider/util/util.py index 6309cb4..469997e 100755 --- a/src/sas_airflow_provider/util/util.py +++ b/src/sas_airflow_provider/util/util.py @@ -21,6 +21,9 @@ import os import logging +from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowTaskTimeout + def get_folder_file_contents(session, path: str, http_timeout=None) -> str: """ @@ -138,7 +141,11 @@ def stream_log(session,job,start,limit=99999, http_timeout=None) -> int: else: logging.getLogger(name=None).warning(f"Failed to retrieve parts of the log with status code {r.status_code} from URI: {log_uri}/content. Maybe the log is too large.") except Exception as e: - logging.getLogger(name=None).warning(f"Unable to retrieve parts of the log: {e}. Maybe the log is too large.") + # Makes sure to forward any AirflowException's encountered during log retrieval + if isinstance(e,AirflowTaskTimeout) or isinstance(e,AirflowException): + raise + else: + logging.getLogger(name=None).warning(f"Unable to retrieve parts of the log: {e}. Maybe the log is too large.") return current_line