Skip to content

Commit

Permalink
Httphook: rertry on AirflowException
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 30, 2024
1 parent d6eb5a1 commit 68f5d55
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 7 deletions.
7 changes: 4 additions & 3 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from library.utils import get_url
from library.utils import get_endpoint
from requests.exceptions import HTTPError
from tenacity import retry_if_exception_type, stop_after_attempt


Expand All @@ -19,7 +20,7 @@ def library_cern_publication_records_dag():
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(key, **kwargs):
year = kwargs["params"].get("year")
url = get_url(key, year)
url = get_endpoint(key, year)
return {
"endpoint": url,
"type_of_query": key,
Expand All @@ -32,7 +33,7 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(HTTPError),
},
)
count = get_total_results_count(response.text)
Expand Down
2 changes: 1 addition & 1 deletion dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def get_url(key, year):
def get_endpoint(key, year):
url = {
"publications_total_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
Expand Down
3 changes: 2 additions & 1 deletion dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from requests.exceptions import ProxyError
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -42,7 +43,7 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(ProxyError),
},
)
count = get_total_results_count(response.text)
Expand Down
3 changes: 2 additions & 1 deletion dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
Expand Down Expand Up @@ -43,7 +44,7 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(AirflowException),
},
)
type_of_query = parameters["type_of_query"]
Expand Down
3 changes: 2 additions & 1 deletion dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
get_golden_access_records_ids,
get_green_access_records_ids,
)
from requests.exceptions import ProxyError
from tenacity import retry_if_exception_type, stop_after_attempt


Expand All @@ -20,7 +21,7 @@ def get_count_http_hook(total, url, record_extractor):
endpoint=full_url,
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(ProxyError),
},
)
records_ids_count = records_ids_count + len(record_extractor(response.text))
Expand Down
Empty file.

0 comments on commit 68f5d55

Please sign in to comment.