Skip to content

Commit

Permalink
Library KPIs: items in the institutional repo
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 8, 2024
1 parent 33bd15e commit 9a0f74c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
12 changes: 12 additions & 0 deletions dags/library/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
INSPIRE_ARXIV_RECORDS = (
r"037%3A%27arXiv%27+and+not+980%3Ahidden&action_search="
+ r"Search&op1=a&m1=a&p1=&f1=&c=CERN+Document+"
+ r"Server&sf=&so=d&rm=&rg=10&sc=1&of=xm&wl=0"
)

INSPIRE_CURATORS_RECORDS = (
r"035%3A%27oai%3Ainspirehep.net%27+not+"
+ r"037%3A%27arXiv%27&action_search=Search"
+ r"&op1=a&m1=a&p1=&f1=&c=CERN+Document+Server&sf=&so=d&"
+ r"rm=&rg=10&sc=1&of=xm&wl=0"
)
82 changes: 82 additions & 0 deletions dags/library/new_items_in_the_institutional_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from functools import reduce

import library.constants as constants
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from tenacity import retry_if_exception_type, stop_after_attempt


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule_interval="@monthly",
params={"year": 2023},
)
def library_new_items_in_the_institutional_repository():
@task(executor_config=kubernetes_executor_config, multiple_outputs=True)
def generate_params(query, **kwargs):
year = kwargs["params"].get("year")
base_query = f"search?ln=en&p=year%3A{year}+"
type_of_query = [*query][0]
return {
"endpoint": base_query + query[type_of_query],
"type_of_query": type_of_query,
}

@task(executor_config=kubernetes_executor_config)
def fetch_count(parameters):
http_hook = HttpHook(http_conn_id="cds", method="GET")
response = http_hook.run_with_advanced_retry(
endpoint=parameters["endpoint"],
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(Exception),
},
)
count = get_total_results_count(response.text)
return {parameters["type_of_query"]: count}

query_list = [
{"inspire_arxiv_records": constants.INSPIRE_ARXIV_RECORDS},
{"inspire_curators_records": constants.INSPIRE_CURATORS_RECORDS},
]

parameters = generate_params.expand(query=query_list)
counts = fetch_count.expand(parameters=parameters)

@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join_and_add_year(counts, **kwargs):
year = kwargs["params"].get("year")
results = reduce(lambda a, b: {**a, **b}, counts)
results["year"] = year
return results

results = join_and_add_year(counts)

populate_items_in_the_institutional_repository = PostgresOperator(
task_id="populate_items_in_the_institutional_repository",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO items_in_the_institutional_repository (year,
inspire_arxiv_records, inspire_curators_records, created_at, updated_at)
VALUES (%(year)s, %(inspire_arxiv_records)s, %(inspire_curators_records)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
inspire_arxiv_records = EXCLUDED.inspire_arxiv_records,
inspire_curators_records = EXCLUDED.inspire_curators_records,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)

counts >> results >> populate_items_in_the_institutional_repository


Library_new_items_in_the_institutional_repository = (
library_new_items_in_the_institutional_repository()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Library KPIs: New items in the institutional repository
Revision ID: 101f23913167
Revises: 50d9b3ef5a3b
Create Date: 2024-07-05 17:55:10.676310
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "101f23913167"
down_revision: Union[str, None] = "50d9b3ef5a3b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade():
op.create_table(
"items_in_the_institutional_repository",
sa.Column("year", sa.Integer, primary_key=True),
sa.Column("inspire_arxiv_records", sa.Integer, nullable=False),
sa.Column("inspire_curators_records", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
)


def downgrade():
op.drop_table("items_in_the_institutional_repository")

0 comments on commit 9a0f74c

Please sign in to comment.