diff --git a/dags/library/constants.py b/dags/library/constants.py new file mode 100644 index 0000000..9c7697f --- /dev/null +++ b/dags/library/constants.py @@ -0,0 +1,12 @@ +INSPIRE_ARXIV_RECORDS = ( + r"037%3A%27arXiv%27+and+not+980%3Ahidden&action_search=" + + r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+" + + r"Server&sf=&so=d&rm=&rg=10&sc=1&of=xm&wl=0" +) + +INSPIRE_CURATORS_RECORDS = ( + r"035%3A%27oai%3Ainspirehep.net%27+not+" + + r"037%3A%27arXiv%27&action_search=Search" + + r"&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&" + + r"rm=&rg=10&sc=1&of=xm&wl=0" +) diff --git a/dags/library/new_items_in_the_institutional_repository.py b/dags/library/new_items_in_the_institutional_repository.py new file mode 100644 index 0000000..5d85c9a --- /dev/null +++ b/dags/library/new_items_in_the_institutional_repository.py @@ -0,0 +1,82 @@ +from functools import reduce + +import library.constants as constants +import pendulum +from airflow.decorators import dag, task +from airflow.providers.http.hooks.http import HttpHook +from airflow.providers.postgres.operators.postgres import PostgresOperator +from common.utils import get_total_results_count +from executor_config import kubernetes_executor_config +from tenacity import retry_if_exception_type, stop_after_attempt + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule_interval="@monthly", + params={"year": 2023}, +) +def library_new_items_in_the_institutional_repository(): + @task(executor_config=kubernetes_executor_config, multiple_outputs=True) + def generate_params(query, **kwargs): + year = kwargs["params"].get("year") + base_query = f"search?ln=en&p=year%3A{year}+" + type_of_query = [*query][0] + return { + "endpoint": base_query + query[type_of_query], + "type_of_query": type_of_query, + } + + @task(executor_config=kubernetes_executor_config) + def fetch_count(parameters): + http_hook = HttpHook(http_conn_id="cds", method="GET") + response = http_hook.run_with_advanced_retry( + endpoint=parameters["endpoint"], + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(Exception), + }, + ) + count = get_total_results_count(response.text) + return {parameters["type_of_query"]: count} + + query_list = [ + {"inspire_arxiv_records": constants.INSPIRE_ARXIV_RECORDS}, + {"inspire_curators_records": constants.INSPIRE_CURATORS_RECORDS}, + ] + + parameters = generate_params.expand(query=query_list) + counts = fetch_count.expand(parameters=parameters) + + @task(multiple_outputs=True, executor_config=kubernetes_executor_config) + def join_and_add_year(counts, **kwargs): + year = kwargs["params"].get("year") + results = reduce(lambda a, b: {**a, **b}, counts) + results["year"] = year + return results + + results = join_and_add_year(counts) + + populate_items_in_the_institutional_repository = PostgresOperator( + task_id="populate_items_in_the_institutional_repository", + postgres_conn_id="superset_qa", + sql=""" + INSERT INTO items_in_the_institutional_repository (year, + inspire_arxiv_records, inspire_curators_records, created_at, updated_at) + VALUES (%(year)s, %(inspire_arxiv_records)s, %(inspire_curators_records)s, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (year) + DO UPDATE SET + inspire_arxiv_records = EXCLUDED.inspire_arxiv_records, + inspire_curators_records = EXCLUDED.inspire_curators_records, + updated_at = CURRENT_TIMESTAMP; + """, + parameters=results, + executor_config=kubernetes_executor_config, + ) + + counts >> results >> populate_items_in_the_institutional_repository + + +Library_new_items_in_the_institutional_repository = ( + library_new_items_in_the_institutional_repository() +) diff --git a/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py b/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py new file mode 100644 index 0000000..c31da7d --- /dev/null +++ b/dags/migrations/versions/101f23913167_library_kpis_new_items_in_the_.py @@ -0,0 +1,32 @@ +"""Library KPIs: New items in the institutional repository + +Revision ID: 101f23913167 +Revises: 50d9b3ef5a3b +Create Date: 2024-07-05 17:55:10.676310 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "101f23913167" +down_revision: Union[str, None] = "50d9b3ef5a3b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade(): + op.create_table( + "items_in_the_institutional_repository", + sa.Column("year", sa.Integer, primary_key=True), + sa.Column("inspire_arxiv_records", sa.Integer, nullable=False), + sa.Column("inspire_curators_records", sa.Integer, nullable=False), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), + ) + + +def downgrade(): + op.drop_table("items_in_the_institutional_repository")