Skip to content

Commit

Permalink
Dags: migration to SQLAlchemy
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Jul 31, 2024
1 parent 965d48b commit eaf68ac
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 102 deletions.
15 changes: 15 additions & 0 deletions dags/common/models/library/library_cern_publication_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryCernPublicationRecords(Base):
__tablename__ = "library_cern_publication_records"

year = Column(Integer, primary_key=True)
publications_total_count = Column(Float)
conference_proceedings_count = Column(Float)
non_journal_proceedings_count = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LibraryNewItemsInTheInstitutionalRepository(Base):
__tablename__ = "items_in_the_institutional_repository"

year = Column(Integer, primary_key=True)
inspire_arxiv_records = Column(Float)
inspire_curators_records = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
17 changes: 17 additions & 0 deletions dags/common/models/open_access/oa_golden_open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAGoldenOpenAccess(Base):
__tablename__ = "oa_golden_open_access"

year = Column(Integer, primary_key=True)
cern_read_and_publish = Column(Float)
cern_individual_apcs = Column(Float)
scoap3 = Column(Float)
other = Column(Float)
other_collective_models = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
16 changes: 16 additions & 0 deletions dags/common/models/open_access/open_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy import Column, DateTime, Float, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class OAOpenAccess(Base):
__tablename__ = "oa_open_access"

year = Column(Integer, primary_key=True)
closed_access = Column(Float)
bronze_open_access = Column(Float)
green_open_access = Column(Float)
gold_open_access = Column(Float)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
31 changes: 31 additions & 0 deletions dags/common/operators/sqlalchemy_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from executor_config import kubernetes_executor_config
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker


def get_session(conn_id: str):
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()


def sqlalchemy_task(conn_id: str):
def decorator(func):
@task(executor_config=kubernetes_executor_config)
def wrapper(*args, **kwargs):
session = get_session(conn_id)
try:
result = func(*args, session=session, **kwargs)
session.commit()
return result
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
session.close()

return wrapper

return decorator
59 changes: 30 additions & 29 deletions dags/library/cern_publication_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.library.library_cern_publication_records import (
LibraryCernPublicationRecords,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from library.utils import get_url
from sqlalchemy.sql import func


@dag(
Expand All @@ -31,7 +35,7 @@ def fetch_data_task(key, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
results["year"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
Expand All @@ -43,35 +47,32 @@ def join(values, **kwargs):
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_library_cern_publication_records_table",
postgres_conn_id="superset",
sql="""
INSERT INTO library_cern_publication_records (year,
publications_total_count, conference_proceedings_count,
non_journal_proceedings_count, created_at, updated_at)
VALUES (%(years)s, %(publications_total_count)s,
%(conference_proceedings_count)s, %(non_journal_proceedings_count)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
publications_total_count = EXCLUDED.publications_total_count,
conference_proceedings_count = EXCLUDED.conference_proceedings_count,
non_journal_proceedings_count = EXCLUDED.non_journal_proceedings_count,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"publications_total_count": unpacked_results["publications_total_count"],
"conference_proceedings_count": unpacked_results[
@sqlalchemy_task(conn_id="superset_qa")
def populate_cern_publication_records(results, session, **kwargs):
record = (
session.query(LibraryCernPublicationRecords)
.filter_by(year=results["year"])
.first()
)
if record:
record.publications_total_count = results["publications_total_count"]
record.conference_proceedings_count = results[
"conference_proceedings_count"
],
"non_journal_proceedings_count": unpacked_results[
]
record.non_journal_proceedings_count = results[
"non_journal_proceedings_count"
],
},
executor_config=kubernetes_executor_config,
)
]
record.updated_at = func.now()
else:
new_record = LibraryCernPublicationRecords(
year=results["year"],
publications_total_count=results["publications_total_count"],
conference_proceedings_count=results["conference_proceedings_count"],
non_journal_proceedings_count=results["non_journal_proceedings_count"],
)
session.add(new_record)

populate_cern_publication_records(unpacked_results)


library_cern_publication_records = library_cern_publication_records_dag()
43 changes: 24 additions & 19 deletions dags/library/new_items_in_the_institutional_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.library.library_new_items_in_the_institutional_repository import (
LibraryNewItemsInTheInstitutionalRepository,
)
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -56,25 +60,26 @@ def join_and_add_year(counts, **kwargs):

results = join_and_add_year(counts)

populate_items_in_the_institutional_repository = PostgresOperator(
task_id="populate_items_in_the_institutional_repository",
postgres_conn_id="superset_qa",
sql="""
INSERT INTO items_in_the_institutional_repository (year,
inspire_arxiv_records, inspire_curators_records, created_at, updated_at)
VALUES (%(year)s, %(inspire_arxiv_records)s, %(inspire_curators_records)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
inspire_arxiv_records = EXCLUDED.inspire_arxiv_records,
inspire_curators_records = EXCLUDED.inspire_curators_records,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_new_items_in_the_institutional_repository(results, session, **kwargs):
record = (
session.query(LibraryNewItemsInTheInstitutionalRepository)
.filter_by(year=results["year"])
.first()
)
if record:
record.inspire_arxiv_records = results["inspire_arxiv_records"]
record.inspire_curators_records = results["inspire_curators_records"]
record.updated_at = func.now()
else:
new_record = LibraryNewItemsInTheInstitutionalRepository(
year=results["year"],
inspire_arxiv_records=results["inspire_arxiv_records"],
inspire_curators_records=results["inspire_curators_records"],
)
session.add(new_record)

counts >> results >> populate_items_in_the_institutional_repository
populate_new_items_in_the_institutional_repository(results)


Library_new_items_in_the_institutional_repository = (
Expand Down
49 changes: 26 additions & 23 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.open_access.oa_golden_open_access import OAGoldenOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func
from tenacity import retry_if_exception_type, stop_after_attempt


Expand Down Expand Up @@ -68,29 +70,30 @@ def join_and_add_year(counts, **kwargs):

results = join_and_add_year(counts)

populate_golden_open_access = PostgresOperator(
task_id="populate_golden_open_access",
postgres_conn_id="superset",
sql="""
INSERT INTO oa_golden_open_access (year, cern_read_and_publish, cern_individual_apcs,
scoap3, other, other_collective_models, created_at, updated_at)
VALUES (%(year)s, %(cern_read_and_publish)s, %(cern_individual_apcs)s,
%(scoap3)s, %(other)s, %(other_collective_models)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
cern_read_and_publish = EXCLUDED.cern_read_and_publish,
cern_individual_apcs = EXCLUDED.cern_individual_apcs,
scoap3 = EXCLUDED.scoap3,
other = EXCLUDED.other,
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters=results,
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_golden_open_access(results, session, **kwargs):
record = (
session.query(OAGoldenOpenAccess).filter_by(year=results["year"]).first()
)
if record:
record.cern_read_and_publish = results["cern_read_and_publish"]
record.cern_individual_apcs = results["cern_individual_apcs"]
record.scoap3 = results["scoap3"]
record.other = results["other"]
record.other_collective_models = results["other_collective_models"]
record.updated_at = func.now()
else:
new_record = OAGoldenOpenAccess(
year=results["year"],
cern_read_and_publish=results["cern_read_and_publish"],
cern_individual_apcs=results["cern_individual_apcs"],
scoap3=results["scoap3"],
other=results["other"],
other_collective_models=results["other_collective_models"],
)
session.add(new_record)

counts >> results >> populate_golden_open_access
populate_golden_open_access(results)


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
59 changes: 28 additions & 31 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from common.models.open_access.open_access import OAOpenAccess
from common.operators.sqlalchemy_operator import sqlalchemy_task
from common.utils import get_total_results_count, request_again_if_failed
from executor_config import kubernetes_executor_config
from sqlalchemy.sql import func


@dag(
Expand Down Expand Up @@ -36,44 +38,39 @@ def fetch_data_task(query, **kwargs):
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
results["year"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
{"closed": constants.CLOSED_ACCESS},
{"bronze": constants.BRONZE_ACCESS},
{"green": constants.GREEN_ACCESS},
{"gold": constants.GOLD_ACCESS},
{"closed_access": constants.CLOSED_ACCESS},
{"bronze_open_access": constants.BRONZE_ACCESS},
{"green_open_access": constants.GREEN_ACCESS},
{"gold_open_access": constants.GOLD_ACCESS},
],
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset",
sql="""
INSERT INTO oa_open_access (year, closed_access, bronze_open_access,
green_open_access, gold_open_access, created_at, updated_at)
VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
closed_access = EXCLUDED.closed_access,
bronze_open_access = EXCLUDED.bronze_open_access,
green_open_access = EXCLUDED.green_open_access,
gold_open_access = EXCLUDED.gold_open_access,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"closed": unpacked_results["closed"],
"bronze": unpacked_results["bronze"],
"green": unpacked_results["green"],
"gold": unpacked_results["gold"],
},
executor_config=kubernetes_executor_config,
)
@sqlalchemy_task(conn_id="superset_qa")
def populate_open_access(results, session, **kwargs):
record = session.query(OAOpenAccess).filter_by(year=results["year"]).first()
if record:
record.closed_access = results["closed_access"]
record.bronze_open_access = results["bronze_open_access"]
record.green_open_access = results["green_open_access"]
record.gold_open_access = results["gold_open_access"]
record.updated_at = func.now()
else:
new_record = OAOpenAccess(
year=results["year"],
closed_access=results["closed_access"],
bronze_open_access=results["bronze_open_access"],
green_open_access=results["green_open_access"],
gold_open_access=results["gold_open_access"],
)
session.add(new_record)

populate_open_access(unpacked_results)


OA_dag = oa_dag()

0 comments on commit eaf68ac

Please sign in to comment.