From 68f5d55fa16c0589b811ea8a3c7c8ca48ff0572e Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Tue, 30 Jul 2024 16:34:45 +0200 Subject: [PATCH] Httphook: rertry on AirflowException --- dags/library/cern_publication_records.py | 7 ++++--- dags/library/utils.py | 2 +- dags/open_access/gold_open_access_mechanisms.py | 3 ++- dags/open_access/open_access.py | 3 ++- dags/open_access/utils.py | 3 ++- tests/open_access/test_data_harvesting.py | 0 6 files changed, 11 insertions(+), 7 deletions(-) create mode 100644 tests/open_access/test_data_harvesting.py diff --git a/dags/library/cern_publication_records.py b/dags/library/cern_publication_records.py index fd6ed19..d0ab40b 100644 --- a/dags/library/cern_publication_records.py +++ b/dags/library/cern_publication_records.py @@ -6,7 +6,8 @@ from airflow.providers.postgres.operators.postgres import PostgresOperator from common.utils import get_total_results_count from executor_config import kubernetes_executor_config -from library.utils import get_url +from library.utils import get_endpoint +from requests.exceptions import HTTPError from tenacity import retry_if_exception_type, stop_after_attempt @@ -19,7 +20,7 @@ def library_cern_publication_records_dag(): @task(multiple_outputs=True, executor_config=kubernetes_executor_config) def generate_params(key, **kwargs): year = kwargs["params"].get("year") - url = get_url(key, year) + url = get_endpoint(key, year) return { "endpoint": url, "type_of_query": key, @@ -32,7 +33,7 @@ def fetch_count(parameters): endpoint=parameters["endpoint"], _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(HTTPError), }, ) count = get_total_results_count(response.text) diff --git a/dags/library/utils.py b/dags/library/utils.py index 076a537..4f1b8e3 100644 --- a/dags/library/utils.py +++ b/dags/library/utils.py @@ -1,4 +1,4 @@ -def get_url(key, year): +def get_endpoint(key, year): url = { "publications_total_count": r"search?wl=0&ln=en&cc=Published+" + r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+" diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index fb5b91f..afa8f60 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -7,6 +7,7 @@ from airflow.providers.postgres.operators.postgres import PostgresOperator from common.utils import get_total_results_count from executor_config import kubernetes_executor_config +from requests.exceptions import ProxyError from tenacity import retry_if_exception_type, stop_after_attempt @@ -42,7 +43,7 @@ def fetch_count(parameters): endpoint=parameters["endpoint"], _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(ProxyError), }, ) count = get_total_results_count(response.text) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index 35a1154..454154c 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -4,6 +4,7 @@ import open_access.utils as utils import pendulum from airflow.decorators import dag, task +from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook from airflow.providers.postgres.operators.postgres import PostgresOperator from common.utils import get_total_results_count @@ -43,7 +44,7 @@ def fetch_count(parameters): endpoint=parameters["endpoint"], _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(AirflowException), }, ) type_of_query = parameters["type_of_query"] diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 2bac3c9..bbb5680 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -6,6 +6,7 @@ get_golden_access_records_ids, get_green_access_records_ids, ) +from requests.exceptions import ProxyError from tenacity import retry_if_exception_type, stop_after_attempt @@ -20,7 +21,7 @@ def get_count_http_hook(total, url, record_extractor): endpoint=full_url, _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(ProxyError), }, ) records_ids_count = records_ids_count + len(record_extractor(response.text)) diff --git a/tests/open_access/test_data_harvesting.py b/tests/open_access/test_data_harvesting.py new file mode 100644 index 0000000..e69de29