diff --git a/dags/common/exceptions.py b/dags/common/exceptions.py index e259094..c22f416 100644 --- a/dags/common/exceptions.py +++ b/dags/common/exceptions.py @@ -5,11 +5,6 @@ def __init__(self, input, current_year): ) -class DataFetchError(Exception): - def __init__(self, status_code, url): - super().__init__(f"Data fetch failure, status_code={status_code}, url={url}") - - class NotFoundTotalCountOfRecords(Exception): def __init__( self, diff --git a/dags/common/utils.py b/dags/common/utils.py index 6054b52..d01af13 100644 --- a/dags/common/utils.py +++ b/dags/common/utils.py @@ -1,23 +1,7 @@ import datetime import re -import requests -from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput - - -def request_again_if_failed(url, cds_token=None): - if cds_token: - header = {"Authorization": f"token {cds_token}"} - response = requests.get(url, header) - response = requests.get(url) - count = 1 - - while response.status_code == 502 and count != 10: - count = count + 1 - response = requests.get(url) - if response.status_code != 200: - raise DataFetchError(url=url, status_code=response.status_code) - return response +from common.exceptions import NotFoundTotalCountOfRecords, WrongInput def get_total_results_count(data): diff --git a/dags/library/cern_publication_records.py b/dags/library/cern_publication_records.py index 6be4fb2..ce38719 100644 --- a/dags/library/cern_publication_records.py +++ b/dags/library/cern_publication_records.py @@ -1,5 +1,3 @@ -import logging -import os from functools import reduce import pendulum @@ -8,10 +6,16 @@ LibraryCernPublicationRecords, ) from common.operators.sqlalchemy_operator import sqlalchemy_task -from common.utils import get_total_results_count, request_again_if_failed +from common.utils import get_total_results_count from executor_config import kubernetes_executor_config -from library.utils import get_url from sqlalchemy.sql import func +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook +from common.utils import get_total_results_count +from executor_config import kubernetes_executor_config +from library.utils import get_endpoint +from tenacity import retry_if_exception_type, stop_after_attempt +from functools import reduce @dag( @@ -20,32 +24,45 @@ params={"year": 2023}, ) def library_cern_publication_records_dag(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(key, **kwargs): + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def generate_params(key, **kwargs): year = kwargs["params"].get("year") - cds_token = os.environ.get("CDS_TOKEN") - if not cds_token: - logging.warning("cds token is not set!") - type_of_query = key - url = get_url(type_of_query, year) - data = request_again_if_failed(url=url, cds_token=cds_token) - total = get_total_results_count(data.text) - return {type_of_query: total} + url = get_endpoint(key, year) + return { + "endpoint": url, + "type_of_query": key, + } + + @task(executor_config=kubernetes_executor_config) + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + keys_list = [ + "publications_total_count", + "conference_proceedings_count", + "non_journal_proceedings_count", + ] + + parameters = generate_params.expand(key=keys_list) + counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["year"] = kwargs["params"].get("year") + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year return results - results = fetch_data_task.expand( - key=[ - "publications_total_count", - "conference_proceedings_count", - "non_journal_proceedings_count", - ], - ) - unpacked_results = join(results) + results = join_and_add_year(counts) @sqlalchemy_task(conn_id="superset_qa") def populate_cern_publication_records(results, session, **kwargs): @@ -72,7 +89,7 @@ def populate_cern_publication_records(results, session, **kwargs): ) session.add(new_record) - populate_cern_publication_records(unpacked_results) + populate_cern_publication_records(results) library_cern_publication_records = library_cern_publication_records_dag() diff --git a/dags/library/utils.py b/dags/library/utils.py index 7aa7a18..e22a0a7 100644 --- a/dags/library/utils.py +++ b/dags/library/utils.py @@ -1,16 +1,16 @@ -def get_url(key, year): - url = { - "publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" +def get_endpoint(key, year): + endpoint = { + "publications_total_count": r"search?wl=0&ln=en&cc=Published+" + r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+" + rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+" + r"for+annual+report%27&f=&action_search=Search&of=xm", - "conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" + "conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+" + r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+" + rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+" + r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm", - "non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+" + "non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+" + rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+" + r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search=" + r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm", } - return url[key] + return endpoint[key] diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index f11d651..294896d 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -3,6 +3,7 @@ import open_access.constants as constants import pendulum from airflow.decorators import dag, task +from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess from common.operators.sqlalchemy_operator import sqlalchemy_task @@ -19,7 +20,7 @@ ) def oa_gold_open_access_mechanisms(): @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def generate_params(query, **kwargs): + def generate_params(query_object, **kwargs): year = kwargs["params"].get("year") current_collection = "Published+Articles" golden_access_base_query = ( @@ -27,11 +28,11 @@ def generate_params(query, **kwargs): + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter+not+595:'Not+for+annual+report" ) - type_of_query = [*query][0] - query_p = rf"{golden_access_base_query}+{query[type_of_query]}" + type_of_query = [*query_object][0] + query = rf"{golden_access_base_query}+{query_object[type_of_query]}" return { - "endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}" + "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", "type_of_query": type_of_query, @@ -44,13 +45,13 @@ def fetch_count(parameters): endpoint=parameters["endpoint"], _retry_args={ "stop": stop_after_attempt(3), - "retry": retry_if_exception_type(Exception), + "retry": retry_if_exception_type(AirflowException), }, ) count = get_total_results_count(response.text) return {parameters["type_of_query"]: count} - query_list = [ + queries_objects_list = [ {"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH}, {"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS}, {"scoap3": constants.SCOAP3}, @@ -58,7 +59,7 @@ def fetch_count(parameters): {"other_collective_models": constants.OTHER_COLLECTIVE_MODELS}, ] - parameters = generate_params.expand(query=query_list) + parameters = generate_params.expand(query_object=queries_objects_list) counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index b3fb66c..542fd7e 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -6,9 +6,13 @@ from airflow.decorators import dag, task from common.models.open_access.open_access import OAOpenAccess from common.operators.sqlalchemy_operator import sqlalchemy_task -from common.utils import get_total_results_count, request_again_if_failed from executor_config import kubernetes_executor_config from sqlalchemy.sql import func +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook +from common.utils import get_total_results_count +from executor_config import kubernetes_executor_config +from tenacity import retry_if_exception_type, stop_after_attempt @dag( @@ -17,39 +21,63 @@ params={"year": 2023}, ) def oa_dag(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(query, **kwargs): + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def generate_params(query_object, **kwargs): year = kwargs["params"].get("year") + current_collection = "Published+Articles" base_query = ( r"(affiliation:CERN+or+595:'For+annual+report')" + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter" ) - type_of_query = [*query][0] - url = utils.get_url(query=f"{base_query}") - data = request_again_if_failed(url=url) - total = get_total_results_count(data.text) + type_of_query = [*query_object][0] + query = rf"{base_query}+{query_object[type_of_query]}" + + return { + "endpoint": rf"search?ln=en&cc={current_collection}&p={query}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", + "type_of_query": type_of_query, + } + + @task(executor_config=kubernetes_executor_config) + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + type_of_query = parameters["type_of_query"] + endpoint = parameters["endpoint"] + total = get_total_results_count(response.text) if type_of_query == "gold": - total = utils.get_golden_access_count(total, url) + total = utils.get_golden_access_count(total, endpoint) if type_of_query == "green": - total = utils.get_green_access_count(total, url) - return {type_of_query: total} + total = utils.get_green_access_count(total, endpoint) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + queries_objects_list = [ + {"closed_access": constants.CLOSED_ACCESS}, + {"bronze_open_access": constants.BRONZE_ACCESS}, + {"green_open_access": constants.GREEN_ACCESS}, + {"gold_open_access": constants.GOLD_ACCESS}, + ] + + parameters = generate_params.expand(query_object=queries_objects_list) + counts = fetch_count.expand(parameters=parameters) @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["year"] = kwargs["params"].get("year") + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year return results - results = fetch_data_task.expand( - query=[ - {"closed_access": constants.CLOSED_ACCESS}, - {"bronze_open_access": constants.BRONZE_ACCESS}, - {"green_open_access": constants.GREEN_ACCESS}, - {"gold_open_access": constants.GOLD_ACCESS}, - ], - ) - unpacked_results = join(results) + results = join_and_add_year(counts) @sqlalchemy_task(conn_id="superset_qa") def populate_open_access(results, session, **kwargs): @@ -70,7 +98,7 @@ def populate_open_access(results, session, **kwargs): ) session.add(new_record) - populate_open_access(unpacked_results) + populate_open_access(results) OA_dag = oa_dag() diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 1f7bc97..5a04274 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -1,31 +1,40 @@ import logging import math -from common.utils import request_again_if_failed +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook from open_access.parsers import ( get_golden_access_records_ids, get_green_access_records_ids, ) +from tenacity import retry_if_exception_type, stop_after_attempt -def get_count(total, url, record_extractor): +def get_count_http_hook(total, url, record_extractor): + http_hook = HttpHook(http_conn_id="cds", method="GET") iterations = math.ceil(total / 100.0) records_ids_count = 0 for i in range(0, iterations): jrec = (i * 100) + 1 full_url = f"{url}&jrec={jrec}" - response = request_again_if_failed(full_url) + response = http_hook.run_with_advanced_retry( + endpoint=full_url, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) records_ids_count = records_ids_count + len(record_extractor(response.text)) logging.info(f"In total was found {records_ids_count} golden access records") return records_ids_count def get_golden_access_count(total, url): - return get_count(total, url, get_golden_access_records_ids) + return get_count_http_hook(total, url, get_golden_access_records_ids) def get_green_access_count(total, url): - return get_count(total, url, get_green_access_records_ids) + return get_count_http_hook(total, url, get_green_access_records_ids) def get_url(query, current_collection="Published+Articles"): diff --git a/tests/open_access/test_data_harvesting.py b/tests/open_access/test_data_harvesting.py new file mode 100644 index 0000000..e69de29