Skip to content

Commit

Permalink
Celery queue prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Apr 4, 2024
1 parent 77afeb3 commit 4e2f269
Show file tree
Hide file tree
Showing 46 changed files with 1,154 additions and 558 deletions.
2 changes: 2 additions & 0 deletions api/admin/controller/collection_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from api.circulation import CirculationApiType
from api.integration.registry.license_providers import LicenseProvidersRegistry
from core.celery.tasks.collection_delete import collection_delete
from core.integration.base import HasChildIntegrationConfiguration
from core.integration.registry import IntegrationRegistry
from core.model import (
Expand Down Expand Up @@ -169,6 +170,7 @@ def process_delete(self, service_id: int) -> Response | ProblemDetail:

# Flag the collection to be deleted by script in the background.
collection.marked_for_deletion = True
collection_delete.delay(collection.id)
return Response("Deleted", 200)

def process_collection_self_tests(
Expand Down
55 changes: 22 additions & 33 deletions api/admin/controller/report.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import json
from dataclasses import asdict
from http import HTTPStatus

import flask
from flask import Response
from sqlalchemy.orm import Session

from core.celery.tasks.inventory_reports import generate_inventory_reports
from core.model import Library
from core.model.admin import Admin
from core.model.deferredtask import (
DeferredTaskType,
InventoryReportTaskData,
queue_task,
)
from core.problem_details import INTERNAL_SERVER_ERROR
from core.util.log import LoggerMixin
from core.util.problem_detail import ProblemDetail

Expand All @@ -25,29 +19,24 @@ def __init__(self, db: Session):
def generate_inventory_report(self) -> Response | ProblemDetail:
library: Library = getattr(flask.request, "library")
admin: Admin = getattr(flask.request, "admin")
try:
# these values should never be None
assert admin.email
assert admin.id
assert library.id

data: InventoryReportTaskData = InventoryReportTaskData(
admin_email=admin.email, admin_id=admin.id, library_id=library.id
)
task, is_new = queue_task(
self._db, task_type=DeferredTaskType.INVENTORY_REPORT, data=asdict(data)
)

msg = (
f"An inventory report request was {'already' if not is_new else ''} received at {task.created}. "
f"When processing is complete, the report will be sent to {admin.email}."
)

self.log.info(msg + f" {task}")
http_status = HTTPStatus.ACCEPTED if is_new else HTTPStatus.CONFLICT
return Response(json.dumps(dict(message=msg)), http_status)
except Exception as e:
msg = f"failed to generate inventory report request: {e}"
self.log.error(msg=msg, exc_info=e)
self._db.rollback()
return INTERNAL_SERVER_ERROR.detailed(detail=msg)

# these values should never be None
assert admin.email
assert admin.id
assert library.id

task = generate_inventory_reports.delay(
library_id=library.id, admin_email=admin.email
)

msg = (
f"An inventory report request was received. "
f"When processing is complete, the report will be sent to {admin.email}."
)

self.log.info(msg + f" {task.id}")
return Response(
json.dumps(dict(message=msg)),
status=HTTPStatus.ACCEPTED,
mimetype="application/json",
)
12 changes: 0 additions & 12 deletions bin/delete_old_deferred_tasks

This file was deleted.

12 changes: 0 additions & 12 deletions bin/generate_inventory_reports

This file was deleted.

Empty file added core/celery/__init__.py
Empty file.
28 changes: 28 additions & 0 deletions core/celery/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager

from sqlalchemy.orm import Session, sessionmaker

from core.util.log import LoggerMixin


class Job(LoggerMixin, ABC):
def __init__(self, session_maker: sessionmaker[Session]):
self._session_maker = session_maker

@contextmanager
def session(self) -> Generator[Session, None, None]:
with self._session_maker() as session:
yield session

@contextmanager
def transaction(self) -> Generator[Session, None, None]:
with self._session_maker.begin() as session:
yield session

@abstractmethod
def run(self) -> None:
...
26 changes: 26 additions & 0 deletions core/celery/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import celery
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool

from core.model import SessionManager
from core.service.container import Services, container_instance
from core.util.log import LoggerMixin


class Task(celery.Task, LoggerMixin):
_session_maker = None

@property
def session_maker(self) -> sessionmaker[Session]:
if self._session_maker is None:
engine = SessionManager.engine(poolclass=NullPool)
maker = sessionmaker(bind=engine)
SessionManager.setup_event_listener(maker)
self._session_maker = maker

Check warning on line 21 in core/celery/task.py

View check run for this annotation

Codecov / codecov/patch

core/celery/task.py#L18-L21

Added lines #L18 - L21 were not covered by tests
return self._session_maker

@property
def services(self) -> Services:
return container_instance()

Check warning on line 26 in core/celery/task.py

View check run for this annotation

Codecov / codecov/patch

core/celery/task.py#L26

Added line #L26 was not covered by tests
Empty file added core/celery/tasks/__init__.py
Empty file.
44 changes: 44 additions & 0 deletions core/celery/tasks/collection_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from celery import shared_task
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker

from core.celery.job import Job
from core.celery.task import Task
from core.model import Collection


class CollectionDeleteJob(Job):
def __init__(self, session_maker: sessionmaker[Session], collection_id: int):
super().__init__(session_maker)
self.collection_id = collection_id

@staticmethod
def collection(session: Session, collection_id: int) -> Collection | None:
return (
session.execute(select(Collection).where(Collection.id == collection_id))
.scalars()
.one_or_none()
)

@staticmethod
def collection_name(collection: Collection) -> str:
return f"{collection.name}/{collection.protocol} ({collection.id})"

def run(self) -> None:
with self.transaction() as session:
collection = self.collection(session, self.collection_id)
if collection is None:
self.log.error(
f"Collection with id {self.collection_id} not found. Unable to delete."
)
return

self.log.info(f"Deleting collection {self.collection_name(collection)}")
collection.delete()


@shared_task(key="high", bind=True)
def collection_delete(task: Task, collection_id: int) -> None:
CollectionDeleteJob(task.session_maker, collection_id).run()
Loading

0 comments on commit 4e2f269

Please sign in to comment.