Skip to content

Commit

Permalink
All DAGs: migration to HttpHook
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 31, 2024
1 parent 305b317 commit 963537b
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 89 deletions.
5 changes: 0 additions & 5 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ def __init__(self, input, current_year):
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
Expand Down
18 changes: 1 addition & 17 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
import datetime
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response
from common.exceptions import NotFoundTotalCountOfRecords, WrongInput


def get_total_results_count(data):
Expand Down
66 changes: 40 additions & 26 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import logging
import os
from functools import reduce

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.library.library_cern_publication_records import (
LibraryCernPublicationRecords,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from library.utils import get_url
from library.utils import get_endpoint
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -20,32 +21,45 @@
params={"year": 2023},
)
def library_cern_publication_records_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(key, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
type_of_query = key
url = get_url(type_of_query, year)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
return {type_of_query: total}
url = get_endpoint(key, year)
return {
"endpoint": url,
"type_of_query": key,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

keys_list = [
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
]

parameters = generate_params.expand(key=keys_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["year"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
key=[
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

@sqlalchemy_task(conn_id="superset_qa")
def populate_cern_publication_records(results, session, **kwargs):
Expand All @@ -72,7 +86,7 @@ def populate_cern_publication_records(results, session, **kwargs):
)
session.add(new_record)

populate_cern_publication_records(unpacked_results)
populate_cern_publication_records(results)


library_cern_publication_records = library_cern_publication_records_dag()
12 changes: 6 additions & 6 deletions dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
def get_url(key, year):
url = {
"publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
def get_endpoint(key, year):
endpoint = {
"publications_total_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
+ rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+"
+ r"for+annual+report%27&f=&action_search=Search&of=xm",
"conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
"conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+"
+ rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+"
+ r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm",
"non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+"
"non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+"
+ rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+"
+ r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm",
}
return url[key]
return endpoint[key]
15 changes: 8 additions & 7 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import open_access.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
Expand All @@ -19,19 +20,19 @@
)
def oa_gold_open_access_mechanisms():
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query, **kwargs):
def generate_params(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
golden_access_base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
query_p = rf"{golden_access_base_query}+{query[type_of_query]}"
type_of_query = [*query_object][0]
query = rf"{golden_access_base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
Expand All @@ -44,21 +45,21 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
queries_objects_list = [
{"cern_read_and_publish": constants.CERN_READ_AND_PUBLISH},
{"cern_individual_apcs": constants.CERN_INDIVIDUAL_APCS},
{"scoap3": constants.SCOAP3},
{"other": constants.OTHER},
{"other_collective_models": constants.OTHER_COLLECTIVE_MODELS},
]

parameters = generate_params.expand(query=query_list)
parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
73 changes: 50 additions & 23 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.open_access.open_access import OAOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -17,39 +20,63 @@
params={"year": 2023},
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query_object, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}")
data = request_again_if_failed(url=url)
total = get_total_results_count(data.text)
type_of_query = [*query_object][0]
query = rf"{base_query}+{query_object[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
if type_of_query == "gold":
total = utils.get_golden_access_count(total, url)
total = utils.get_golden_access_count(total, endpoint)
if type_of_query == "green":
total = utils.get_green_access_count(total, url)
return {type_of_query: total}
total = utils.get_green_access_count(total, endpoint)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

queries_objects_list = [
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
]

parameters = generate_params.expand(query_object=queries_objects_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["year"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
query=[
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

@sqlalchemy_task(conn_id="superset_qa")
def populate_open_access(results, session, **kwargs):
Expand All @@ -70,7 +97,7 @@ def populate_open_access(results, session, **kwargs):
)
session.add(new_record)

populate_open_access(unpacked_results)
populate_open_access(results)


OA_dag = oa_dag()
19 changes: 14 additions & 5 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
import logging
import math

from common.utils import request_again_if_failed
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from open_access.parsers import (
get_golden_access_records_ids,
get_green_access_records_ids,
)
from tenacity import retry_if_exception_type, stop_after_attempt


def get_count(total, url, record_extractor):
def get_count_http_hook(total, url, record_extractor):
http_hook = HttpHook(http_conn_id="cds", method="GET")
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
response = http_hook.run_with_advanced_retry(
endpoint=full_url,
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
records_ids_count = records_ids_count + len(record_extractor(response.text))
logging.info(f"In total was found {records_ids_count} golden access records")
return records_ids_count


def get_golden_access_count(total, url):
return get_count(total, url, get_golden_access_records_ids)
return get_count_http_hook(total, url, get_golden_access_records_ids)


def get_green_access_count(total, url):
return get_count(total, url, get_green_access_records_ids)
return get_count_http_hook(total, url, get_green_access_records_ids)


def get_url(query, current_collection="Published+Articles"):
Expand Down

0 comments on commit 963537b

Please sign in to comment.