diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index 1887e60..9e5d997 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -1,12 +1,10 @@ -from functools import reduce - import open_access.constants as constants import pendulum from airflow.decorators import dag, task +from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.providers.postgres.operators.postgres import PostgresOperator -from common.utils import get_total_results_count, request_again_if_failed +from common.utils import get_total_results_count from executor_config import kubernetes_executor_config -from open_access.utils import get_url @dag( @@ -15,27 +13,28 @@ params={"year": 2023}, ) def oa_gold_open_access_mechanisms(): - @task(executor_config=kubernetes_executor_config) - def fetch_data_task(query, **kwargs): + @task(multiple_outputs=True) + def generate_params(query, **kwargs): year = kwargs["params"].get("year") + current_collection = "Published+Articles" golden_access_base_query = ( r"(affiliation:CERN+or+595:'For+annual+report')" + rf"and+year:{year}+not+980:ConferencePaper+" + r"not+980:BookChapter+not+595:'Not+for+annual+report" ) type_of_query = [*query][0] - url = get_url(f"{golden_access_base_query}+{query[type_of_query]}") - data = request_again_if_failed(url) - total = get_total_results_count(data.text) - return {type_of_query: total} + query_p = rf"{golden_access_base_query}+{query[type_of_query]}" - @task(multiple_outputs=True, executor_config=kubernetes_executor_config) - def join(values, **kwargs): - results = reduce(lambda a, b: {**a, **b}, values) - results["years"] = kwargs["params"].get("year") - return results + return { + "endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}" + + r"&action_search=Search&op1=a&m1=a&p1=&f1=&c=" + + r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm", + "response_filter": lambda response: { + type_of_query: get_total_results_count(response.text) + }, + } - results = fetch_data_task.expand( + endpoints = generate_params.expand( query=[ {"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH}, {"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS}, @@ -44,9 +43,25 @@ def join(values, **kwargs): {"other_collective_models": constants.OTHER_COLLECTIVE_MODELS}, ], ) - unpacked_results = join(results) - PostgresOperator( + http_tasks = SimpleHttpOperator.partial( + task_id="trigger_cds_request", + http_conn_id="cds", + method="GET", + headers={}, + log_response=True, + retries=5, + ).expand_kwargs(endpoints) + + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def join_and_add_year(values, **kwargs): + year = kwargs["params"].get("year") + values["years"] = year + return values + + results = join_and_add_year(http_tasks.output) + + populate_golden_open_access = PostgresOperator( task_id="populate_golden_open_access", postgres_conn_id="superset_qa", sql=""" @@ -64,16 +79,11 @@ def join(values, **kwargs): other_collective_models = EXCLUDED.other_collective_models, updated_at = CURRENT_TIMESTAMP; """, - parameters={ - "years": unpacked_results["years"], - "cern_read_and_publish": unpacked_results["cern_read_and_publish"], - "cern_individual_apcs": unpacked_results["cern_individual_apcs"], - "scoap3": unpacked_results["scoap3"], - "other": unpacked_results["other"], - "other_collective_models": unpacked_results["other_collective_models"], - }, + parameters=results, executor_config=kubernetes_executor_config, ) + http_tasks >> results >> populate_golden_open_access + OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms() diff --git a/requirements.txt b/requirements.txt index 1663ef4..f1a1f06 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.3 alembic airflow-provider-alembic==1.0.0 +apache-airflow-providers-http==4.10.0 elementpath==4.4.0