Skip to content

Commit

Permalink
Library: library_cern_publication_record
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed May 10, 2024
1 parent 300c272 commit 027e642
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 46 deletions.
41 changes: 41 additions & 0 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import datetime
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response


def get_total_results_count(data):
TOTAL_RECORDS_COUNT = re.compile(
r"Search-Engine-Total-Number-Of-Results" + r":\s(\d*)\s"
)
comment_line = data.split("\n")[1]
match = TOTAL_RECORDS_COUNT.search(comment_line)
try:
total_records_count = match.group(1)
return int(total_records_count)
except AttributeError:
raise NotFoundTotalCountOfRecords


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)
77 changes: 77 additions & 0 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
import os
from functools import reduce

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from library.utils import get_url


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
params={"year": 2023},
)
def library_cern_publication_records_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(key, **kwargs):
year = kwargs["params"].get("year")
cds_token = os.environ.get("CDS_TOKEN")
if not cds_token:
logging.warning("cds token is not set!")
type_of_query = key
url = get_url(type_of_query, year)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
"publications_total_count",
"conference_proceedings_count",
"non_journal_proceedings_count",
],
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
publications_total_count = EXCLUDED.publications_total_count,
conference_proceedings_count = EXCLUDED.conference_proceedings_count,
non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
"non_journal_proceedings_count"
],
},
executor_config=kubernetes_executor_config,
)


library_cern_publication_records = library_cern_publication_records_dag()
16 changes: 16 additions & 0 deletions dags/library/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def get_url(key, year):
url = {
"publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
+ r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+"
+ rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+"
+ r"for+annual+report%27&f=&action_search=Search&of=xm",
"conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+"
+ r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+"
+ rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+"
+ r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm",
"non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+"
+ rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+"
+ r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=hb",
}
return url[key]
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Library KPI: Number of CERN Publications,
contributions in conference proceedings,
Number of CERN publications
(other than journals and proceedings)
Revision ID: 50d9b3ef5a3b
Revises: 64ac526a078b
Create Date: 2024-05-08 10:27:00.634151
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "50d9b3ef5a3b"
down_revision: Union[str, None] = "64ac526a078b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade():
op.create_table(
"library_cern_publication_records",
sa.Column("year", sa.Integer, primary_key=True),
sa.Column("publications_total_count", sa.Integer, nullable=False),
sa.Column("conference_proceedings_count", sa.Integer, nullable=False),
sa.Column("non_journal_proceedings_count", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
)


def downgrade():
op.drop_table("library_cern_publication_records")
9 changes: 5 additions & 4 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from functools import reduce

import open_access.constants as constants
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from open_access.utils import get_url


@dag(
Expand All @@ -23,9 +24,9 @@ def fetch_data_task(query, **kwargs):
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = utils.request_again_if_failed(url)
total = utils.get_total_results_count(data.text)
url = get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = request_again_if_failed(url)
total = get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
Expand Down
5 changes: 3 additions & 2 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config


Expand All @@ -29,8 +30,8 @@ def fetch_data_task(query, **kwargs):
)
type_of_query = [*query][0]
url = utils.get_url(query=f"{base_query}+{query[type_of_query]}")
data = utils.request_again_if_failed(url=url, cds_token=cds_token)
total = utils.get_total_results_count(data.text)
data = request_again_if_failed(url=url, cds_token=cds_token)
total = get_total_results_count(data.text)
if type_of_query == "gold":
total = utils.get_gold_access_count(total, url)
if type_of_query == "green":
Expand Down
41 changes: 1 addition & 40 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,10 @@
import datetime
import logging
import math
import re

import requests
from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput
from common.utils import request_again_if_failed
from open_access.parsers import get_golden_access_records_ids


def request_again_if_failed(url, cds_token=None):
if cds_token:
header = {"Authorization": f"token {cds_token}"}
response = requests.get(url, header)
response = requests.get(url)
count = 1

while response.status_code == 502 and count != 10:
count = count + 1
response = requests.get(url)
if response.status_code != 200:
raise DataFetchError(url=url, status_code=response.status_code)
return response


def get_total_results_count(data):
TOTAL_RECORDS_COUNT = re.compile(
r"Search-Engine-Total-Number-Of-Results" + r":\s(\d*)\s"
)
comment_line = data.split("\n")[1]
match = TOTAL_RECORDS_COUNT.search(comment_line)
try:
total_records_count = match.group(1)
return int(total_records_count)
except AttributeError:
raise NotFoundTotalCountOfRecords


def check_year(year):
current_year = datetime.date.today().year
if type(year) == int:
if int(year) >= 2004 and int(year) <= current_year:
return year
raise WrongInput(year, current_year)


def get_gold_access_count(total, url):
iterations = math.ceil(total / 100.0)
records_ids_count = 0
Expand Down

0 comments on commit 027e642

Please sign in to comment.