Skip to content

Commit

Permalink
All DAGs: migration to HttpHook
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 31, 2024
1 parent 33bd15e commit 513e819
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 96 deletions.
5 changes: 0 additions & 5 deletions dags/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ def __init__(self, input, current_year):
)


class DataFetchError(Exception):
def __init__(self, status_code, url):
super().__init__(f"Data fetch failure, status_code={status_code}, url={url}")


class NotFoundTotalCountOfRecords(Exception):
def __init__(
self,
Expand Down
18 changes: 1 addition & 17 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
import datetime
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response
from common.exceptions import NotFoundTotalCountOfRecords, WrongInput


def get_total_results_count(data):
Expand Down
79 changes: 44 additions & 35 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
import os
from functools import reduce

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from library.utils import get_url
from library.utils import get_endpoint
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -16,41 +17,53 @@
params={"year": 2023},
)
def library_cern_publication_records_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(key, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
type_of_query = key
url = get_url(type_of_query, year)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
return {type_of_query: total}
url = get_endpoint(key, year)
return {
"endpoint": url,
"type_of_query": key,
}

@task
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

keys_list = [
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
]

parameters = generate_params.expand(key=keys_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
key=[
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
],
)
unpacked_results = join(results)

results = join_and_add_year(counts)
PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
VALUES (%(year)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
Expand All @@ -61,14 +74,10 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
"non_journal_proceedings_count"
],
"year": results["year"],
"publications_total_count": results["publications_total_count"],
"conference_proceedings_count": results["conference_proceedings_count"],
"non_journal_proceedings_count": results["non_journal_proceedings_count"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
12 changes: 6 additions & 6 deletions dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
def get_url(key, year):
url = {
"publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
def get_endpoint(key, year):
endpoint = {
"publications_total_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
+ rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+"
+ r"for+annual+report%27&f=&action_search=Search&of=xm",
"conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
"conference_proceedings_count": r"search?wl=0&ln=en&cc=Published+"
+ r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+"
+ rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+"
+ r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm",
"non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+"
"non_journal_proceedings_count": r"search?ln=en&p=affiliation%3ACERN+or+"
+ rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+"
+ r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=xm",
}
return url[key]
return endpoint[key]
3 changes: 2 additions & 1 deletion dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import open_access.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
Expand Down Expand Up @@ -42,7 +43,7 @@ def fetch_count(parameters):
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
"retry": retry_if_exception_type(AirflowException),
},
)
count = get_total_results_count(response.text)
Expand Down
81 changes: 54 additions & 27 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
Expand All @@ -15,47 +18,71 @@
params={"year": 2023},
)
def oa_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
current_collection = "Published+Articles"
base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter"
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}")
data = request_again_if_failed(url=url)
total = get_total_results_count(data.text)
query_p = rf"{base_query}+{query[type_of_query]}"

return {
"endpoint": rf"search?ln=en&cc={current_collection}&p={query_p}"
+ r"&action_search=Search&op1=a&m1=a&p1=&f1=&c="
+ r"Published+Articles&c=&sf=&so=d&rm=&rg=100&sc=0&of=xm",
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
type_of_query = parameters["type_of_query"]
endpoint = parameters["endpoint"]
total = get_total_results_count(response.text)
if type_of_query == "gold":
total = utils.get_golden_access_count(total, url)
total = utils.get_golden_access_count(total, endpoint)
if type_of_query == "green":
total = utils.get_green_access_count(total, url)
return {type_of_query: total}
total = utils.get_green_access_count(total, endpoint)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
]

parameters = generate_params.expand(query=query_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = fetch_data_task.expand(
query=[
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)
results = join_and_add_year(counts)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO oa_open_access (year, closed_access, bronze_open_access,
green_open_access, gold_open_access, created_at, updated_at)
VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
VALUES (%(year)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
Expand All @@ -66,11 +93,11 @@ def join(values, **kwargs):
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"closed": unpacked_results["closed"],
"bronze": unpacked_results["bronze"],
"green": unpacked_results["green"],
"gold": unpacked_results["gold"],
"year": results["year"],
"closed": results["closed"],
"bronze": results["bronze"],
"green": results["green"],
"gold": results["gold"],
},
executor_config=kubernetes_executor_config,
)
Expand Down
19 changes: 14 additions & 5 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
import logging
import math

from common.utils import request_again_if_failed
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from open_access.parsers import (
get_golden_access_records_ids,
get_green_access_records_ids,
)
from tenacity import retry_if_exception_type, stop_after_attempt


def get_count(total, url, record_extractor):
def get_count_http_hook(total, url, record_extractor):
http_hook = HttpHook(http_conn_id="cds", method="GET")
iterations = math.ceil(total / 100.0)
records_ids_count = 0
for i in range(0, iterations):
jrec = (i * 100) + 1
full_url = f"{url}&jrec={jrec}"
response = request_again_if_failed(full_url)
response = http_hook.run_with_advanced_retry(
endpoint=full_url,
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
records_ids_count = records_ids_count + len(record_extractor(response.text))
logging.info(f"In total was found {records_ids_count} golden access records")
return records_ids_count


def get_golden_access_count(total, url):
return get_count(total, url, get_golden_access_records_ids)
return get_count_http_hook(total, url, get_golden_access_records_ids)


def get_green_access_count(total, url):
return get_count(total, url, get_green_access_records_ids)
return get_count_http_hook(total, url, get_green_access_records_ids)


def get_url(query, current_collection="Published+Articles"):
Expand Down
Empty file.

0 comments on commit 513e819

Please sign in to comment.