Skip to content

Commit

Permalink
http operator in use
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 1, 2024
1 parent 3c0f3c4 commit acc49dd
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
62 changes: 36 additions & 26 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from functools import reduce

import open_access.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from open_access.utils import get_url


@dag(
Expand All @@ -15,27 +13,28 @@
params={"year": 2023},
)
def oa_gold_open_access_mechanisms():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
golden_access_base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
url = get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = request_again_if_failed(url)
total = get_total_results_count(data.text)
return {type_of_query: total}
query_p = rf"{golden_access_base_query}+{query[type_of_query]}"

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results
return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"response_filter": lambda response: {
type_of_query: get_total_results_count(response.text)
},
}

results = fetch_data_task.expand(
endpoints = generate_params.expand(
query=[
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
Expand All @@ -44,9 +43,25 @@ def join(values, **kwargs):
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
],
)
unpacked_results = join(results)

PostgresOperator(
http_tasks = SimpleHttpOperator.partial(
task_id="trigger_cds_request",
http_conn_id="cds",
method="GET",
headers={},
log_response=True,
retries=5,
).expand_kwargs(endpoints)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join_and_add_year(values, **kwargs):
year = kwargs["params"].get("year")
values["years"] = year
return values

results = join_and_add_year(http_tasks.output)

populate_golden_open_access = PostgresOperator(
task_id="populate_golden_open_access",
postgres_conn_id="superset_qa",
sql="""
Expand All @@ -64,16 +79,11 @@ def join(values, **kwargs):
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"cern_read_and_publish": unpacked_results["cern_read_and_publish"],
"cern_individual_apcs": unpacked_results["cern_individual_apcs"],
"scoap3": unpacked_results["scoap3"],
"other": unpacked_results["other"],
"other_collective_models": unpacked_results["other_collective_models"],
},
parameters=results,
executor_config=kubernetes_executor_config,
)

http_tasks >> results >> populate_golden_open_access


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.3
alembic
airflow-provider-alembic==1.0.0
apache-airflow-providers-http==4.10.0
elementpath==4.4.0

0 comments on commit acc49dd

Please sign in to comment.