From 56b6b81cbe0e3aea0d7f0455ce3245a77be58131 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Fri, 10 May 2024 09:45:15 +0200 Subject: [PATCH] Library: library_cern_publication_record * ref: https://github.com/cern-sis/Library-Notifications-Service/issues/9 --- dags/common/utils.py | 41 ++++++++++ dags/library/cern_publication_records.py | 77 +++++++++++++++++++ dags/library/utils.py | 16 ++++ ...ibrary_kpi_number_of_cern_publications_.py | 36 +++++++++ .../gold_open_access_mechanisms.py | 9 ++- dags/open_access/open_access.py | 5 +- dags/open_access/utils.py | 41 +--------- 7 files changed, 179 insertions(+), 46 deletions(-) create mode 100644 dags/common/utils.py create mode 100644 dags/library/cern_publication_records.py create mode 100644 dags/library/utils.py create mode 100644 dags/migrations/versions/50d9b3ef5a3b_library_kpi_number_of_cern_publications_.py diff --git a/dags/common/utils.py b/dags/common/utils.py new file mode 100644 index 0000000..6054b52 --- /dev/null +++ b/dags/common/utils.py @@ -0,0 +1,41 @@ +import datetime +import re + +import requests +from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput + + +def request_again_if_failed(url, cds_token=None): + if cds_token: + header = {"Authorization": f"token {cds_token}"} + response = requests.get(url, header) + response = requests.get(url) + count = 1 + + while response.status_code == 502 and count != 10: + count = count + 1 + response = requests.get(url) + if response.status_code != 200: + raise DataFetchError(url=url, status_code=response.status_code) + return response + + +def get_total_results_count(data): + TOTAL_RECORDS_COUNT = re.compile( + r"Search-Engine-Total-Number-Of-Results" + r":\s(\d*)\s" + ) + comment_line = data.split("\n")[1] + match = TOTAL_RECORDS_COUNT.search(comment_line) + try: + total_records_count = match.group(1) + return int(total_records_count) + except AttributeError: + raise NotFoundTotalCountOfRecords + + +def check_year(year): + current_year = datetime.date.today().year + if type(year) == int: + if int(year) >= 2004 and int(year) <= current_year: + return year + raise WrongInput(year, current_year) diff --git a/dags/library/cern_publication_records.py b/dags/library/cern_publication_records.py new file mode 100644 index 0000000..ffda7d2 --- /dev/null +++ b/dags/library/cern_publication_records.py @@ -0,0 +1,77 @@ +import logging +import os +from functools import reduce + +import pendulum +from airflow.decorators import dag, task +from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.utils import get_total_results_count, request_again_if_failed +from executor_config import kubernetes_executor_config +from library.utils import get_url + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="@monthly", + params={"year": 2023}, +) +def library_cern_publication_records_dag(): + @task(executor_config=kubernetes_executor_config) + def fetch_data_task(key, **kwargs): + year = kwargs["params"].get("year") + cds_token = os.environ.get("CDS_TOKEN") + if not cds_token: + logging.warning("cds token is not set!") + type_of_query = key + url = get_url(type_of_query, year) + data = request_again_if_failed(url=url, cds_token=cds_token) + total = get_total_results_count(data.text) + return {type_of_query: total} + + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def join(values, **kwargs): + results = reduce(lambda a, b: {**a, **b}, values) + results["years"] = kwargs["params"].get("year") + return results + + results = fetch_data_task.expand( + query=[ + "publications_total_count", + "conference_proceedings_count", + "non_journal_proceedings_count", + ], + ) + unpacked_results = join(results) + + PostgresOperator( + task_id="populate_library_cern_publication_records_table", + postgres_conn_id="superset_qa", + sql=""" + INSERT INTO library_cern_publication_records (year, + publications_total_count, conference_proceedings_count, + non_journal_proceedings_count, created_at, updated_at) + VALUES (%(years)s, %(publications_total_count)s, + %(conference_proceedings_count)s, %(non_journal_proceedings_count)s, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (year) + DO UPDATE SET + publications_total_count = EXCLUDED.publications_total_count, + conference_proceedings_count = EXCLUDED.conference_proceedings_count, + non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count, + updated_at = CURRENT_TIMESTAMP; + """, + parameters={ + "years": unpacked_results["years"], + "publications_total_count": unpacked_results["publications_total_count"], + "conference_proceedings_count": unpacked_results[ + "conference_proceedings_count" + ], + "non_journal_proceedings_count": unpacked_results[ + "non_journal_proceedings_count" + ], + }, + executor_config=kubernetes_executor_config, + ) + + +library_cern_publication_records = library_cern_publication_records_dag() diff --git a/dags/library/utils.py b/dags/library/utils.py new file mode 100644 index 0000000..c720f7a --- /dev/null +++ b/dags/library/utils.py @@ -0,0 +1,16 @@ +def get_url(key, year): + url = { + "publications_total_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" + + r"Articles&sc=1&p=%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+and+" + + rf"year%3A{year}+not+980%3AConferencePaper+not+980%3ABookChapter+not+595%3A%27Not+" + + r"for+annual+report%27&f=&action_search=Search&of=xm", + "conference_proceedings_count": r"http://cdsweb.cern.ch/search?wl=0&ln=en&cc=Published+" + + r"Articles&p=980%3AARTICLE+and+%28affiliation%3ACERN+or+595%3A%27For+annual+report%27%29+" + + rf"and+year%3A{year}+and+980%3AConferencePaper+not+595%3A%27Not+for+annual+" + + r"report%27&f=&action_search=Search&c=Published+Articles&c=&sf=author&so=a&rm=&rg=10&sc=1&of=xm", + "non_journal_proceedings_count": r"https://cds.cern.ch/search?ln=en&p=affiliation%3ACERN+or+" + + rf"260%3ACERN+and+260%3A{year}+and+%28980%3ABOOK+or+980%3APROCEEDINGS+or+690%3A%27YELLOW+REPORT%27+" + + r"or+980%3ABookChapter+or+980%3AREPORT%29+not+595%3A%27Not+for+annual+report%27&action_search=" + + r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&rm=&rg=10&sc=1&of=hb", + } + return url[key] diff --git a/dags/migrations/versions/50d9b3ef5a3b_library_kpi_number_of_cern_publications_.py b/dags/migrations/versions/50d9b3ef5a3b_library_kpi_number_of_cern_publications_.py new file mode 100644 index 0000000..1a25817 --- /dev/null +++ b/dags/migrations/versions/50d9b3ef5a3b_library_kpi_number_of_cern_publications_.py @@ -0,0 +1,36 @@ +"""Library KPI: Number of CERN Publications, +contributions in conference proceedings, +Number of CERN publications +(other than journals and proceedings) + +Revision ID: 50d9b3ef5a3b +Revises: 64ac526a078b +Create Date: 2024-05-08 10:27:00.634151 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "50d9b3ef5a3b" +down_revision: Union[str, None] = "64ac526a078b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade(): + op.create_table( + "library_cern_publication_records", + sa.Column("year", sa.Integer, primary_key=True), + sa.Column("publications_total_count", sa.Integer, nullable=False), + sa.Column("conference_proceedings_count", sa.Integer, nullable=False), + sa.Column("non_journal_proceedings_count", sa.Integer, nullable=False), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), + ) + + +def downgrade(): + op.drop_table("library_cern_publication_records") diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py index 5c47ffc..1887e60 100644 --- a/dags/open_access/gold_open_access_mechanisms.py +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -1,11 +1,12 @@ from functools import reduce import open_access.constants as constants -import open_access.utils as utils import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.utils import get_total_results_count, request_again_if_failed from executor_config import kubernetes_executor_config +from open_access.utils import get_url @dag( @@ -23,9 +24,9 @@ def fetch_data_task(query, **kwargs): + r"not+980:BookChapter+not+595:'Not+for+annual+report" ) type_of_query = [*query][0] - url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}") - data = utils.request_again_if_failed(url) - total = utils.get_total_results_count(data.text) + url = get_url(f"{golden_access_base_query}+{query[type_of_query]}") + data = request_again_if_failed(url) + total = get_total_results_count(data.text) return {type_of_query: total} @task(multiple_outputs=True, executor_config=kubernetes_executor_config) diff --git a/dags/open_access/open_access.py b/dags/open_access/open_access.py index 294011e..9d221de 100644 --- a/dags/open_access/open_access.py +++ b/dags/open_access/open_access.py @@ -7,6 +7,7 @@ import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.utils import get_total_results_count, request_again_if_failed from executor_config import kubernetes_executor_config @@ -29,8 +30,8 @@ def fetch_data_task(query, **kwargs): ) type_of_query = [*query][0] url = utils.get_url(query=f"{base_query}+{query[type_of_query]}") - data = utils.request_again_if_failed(url=url, cds_token=cds_token) - total = utils.get_total_results_count(data.text) + data = request_again_if_failed(url=url, cds_token=cds_token) + total = get_total_results_count(data.text) if type_of_query == "gold": total = utils.get_gold_access_count(total, url) if type_of_query == "green": diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 7f75fec..e2d101f 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -1,49 +1,10 @@ -import datetime import logging import math -import re -import requests -from common.exceptions import DataFetchError, NotFoundTotalCountOfRecords, WrongInput +from common.utils import request_again_if_failed from open_access.parsers import get_golden_access_records_ids -def request_again_if_failed(url, cds_token=None): - if cds_token: - header = {"Authorization": f"token {cds_token}"} - response = requests.get(url, header) - response = requests.get(url) - count = 1 - - while response.status_code == 502 and count != 10: - count = count + 1 - response = requests.get(url) - if response.status_code != 200: - raise DataFetchError(url=url, status_code=response.status_code) - return response - - -def get_total_results_count(data): - TOTAL_RECORDS_COUNT = re.compile( - r"Search-Engine-Total-Number-Of-Results" + r":\s(\d*)\s" - ) - comment_line = data.split("\n")[1] - match = TOTAL_RECORDS_COUNT.search(comment_line) - try: - total_records_count = match.group(1) - return int(total_records_count) - except AttributeError: - raise NotFoundTotalCountOfRecords - - -def check_year(year): - current_year = datetime.date.today().year - if type(year) == int: - if int(year) >= 2004 and int(year) <= current_year: - return year - raise WrongInput(year, current_year) - - def get_gold_access_count(total, url): iterations = math.ceil(total / 100.0) records_ids_count = 0