Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task queue using python celery + redis #1760

Merged
merged 22 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ CREATE USER palace with password 'test';
grant all privileges on database circ to palace;
```

### Redis

Redis is used as the broker for Celery. You can run Redis with docker using the following command:

```sh
docker run -d --name redis -p 6379:6379 redis
```

### Environment variables

#### Database
Expand All @@ -187,6 +195,21 @@ To let the application know which Opensearch instance to use, you can set the fo
export PALACE_SEARCH_URL="http://localhost:9200"
```

#### Celery

We use [Celery](https://docs.celeryproject.org/en/stable/) to run background tasks. To configure Celery, you need to
pass a broker URL to the application.

- `PALACE_CELERY_BROKER_URL`: The URL of the broker to use for Celery. (**required**).
- for example:
```sh
export PALACE_CELERY_BROKER_URL="redis://localhost:6379/0"`
```

We support overriding a number of other Celery settings via environment variables, but in most cases
the defaults should be sufficient. The full list of settings can be found in
[`service/celery/configuration.py`](core/service/celery/configuration.py).

#### General

- `PALACE_BASE_URL`: The base URL of the application. Used to create absolute links. (optional)
Expand Down Expand Up @@ -350,6 +373,12 @@ Check that there is now a web server listening on port `6500`:
curl http://localhost:6500/
```

You can start a celery worker with:

```sh
poetry run celery -A "core.celery.worker.app" worker --concurrency 1 --pool solo --beat
```

### The Admin Interface

#### Access
Expand Down Expand Up @@ -492,23 +521,15 @@ to send the Loan and Hold reminders to the mobile applications.

## Scheduled Jobs

All jobs are scheduled via `cron`, as specified in the `docker/services/simplified_crontab` file.
This includes all the import and reaper jobs, as well as other necessary background tasks, such as maintaining
The Palace Manager has a number of background jobs that are scheduled to run at regular intervals. This
includes all the import and reaper jobs, as well as other necessary background tasks, such as maintaining
the search index and feed caches.

### Job Requirements

#### hold_notifications

Requires one of [the Firebase Cloud Messaging credentials environment variables (described above)](#firebase-cloud-messaging)
to be present and non-empty.
In addition, the site-wide `PUSH_NOTIFICATIONS_STATUS` setting must be either `unset` or `true`.

#### loan_notifications
Jobs are scheduled via a combination of `cron` and `celery`. All new jobs should use `celery` for scheduling,
and existing jobs are being migrated to `celery` as they are updated.

Requires one of [the Firebase Cloud Messaging credentials environment variables (described above](#firebase-cloud-messaging)
to be present and non-empty.
In addition, the site-wide `PUSH_NOTIFICATIONS_STATUS` setting must be either `unset` or `true`.
The `cron` jobs are defined in the `docker/services/simplified_crontab` file. The `celery` jobs are defined
in the `core/celery/tasks/` module.

## Code Style

Expand Down Expand Up @@ -664,6 +685,17 @@ Only run the `test_google_analytics_provider` tests with Python 3.8 using docker
tox -e "py38-api-docker" -- tests/api/test_google_analytics_provider.py
```

### Environment Variables

When testing Celery tasks, it can be useful to set the `PALACE_TEST_CELERY_WORKER_SHUTDOWN_TIMEOUT` environment variable
to a higher value than the default of 30 seconds, so you are able to set breakpoints in the worker code and debug it.
This value is interpreted as the number of seconds to wait for the worker to shut down before killing it. If you set
this value to `none` or (empty string), timeouts will be disabled.

```sh
export PALACE_TEST_CELERY_WORKER_SHUTDOWN_TIMEOUT=""
```

### Coverage Reports

Code coverage is automatically tracked with [`pytest-cov`](https://pypi.org/project/pytest-cov/) when tests are run.
Expand Down
4 changes: 3 additions & 1 deletion api/admin/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def setup_admin_controllers(manager: CirculationManager):
manager.timestamps_controller = TimestampsController(manager)
manager.admin_work_controller = WorkController(manager)
manager.admin_feed_controller = FeedController(manager)
manager.admin_custom_lists_controller = CustomListsController(manager)
manager.admin_custom_lists_controller = CustomListsController(
manager._db, manager.external_search, manager.annotator
)
manager.admin_lanes_controller = LanesController(manager)
manager.admin_dashboard_controller = DashboardController(manager)
manager.admin_patron_controller = PatronController(manager)
Expand Down
2 changes: 2 additions & 0 deletions api/admin/controller/collection_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from api.circulation import CirculationApiType
from api.integration.registry.license_providers import LicenseProvidersRegistry
from core.celery.tasks.collection_delete import collection_delete
from core.integration.base import HasChildIntegrationConfiguration
from core.integration.registry import IntegrationRegistry
from core.model import (
Expand Down Expand Up @@ -169,6 +170,7 @@ def process_delete(self, service_id: int) -> Response | ProblemDetail:

# Flag the collection to be deleted by script in the background.
collection.marked_for_deletion = True
collection_delete.delay(collection.id)
return Response("Deleted", 200)

def process_collection_self_tests(
Expand Down
24 changes: 18 additions & 6 deletions api/admin/controller/custom_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from flask_babel import lazy_gettext as _
from flask_pydantic_spec.flask_backend import Context
from pydantic import BaseModel
from sqlalchemy.orm import Session

from api.admin.controller.base import AdminPermissionsControllerMixin
from api.admin.problem_details import (
Expand All @@ -21,9 +22,10 @@
MISSING_COLLECTION,
MISSING_CUSTOM_LIST,
)
from api.controller.circulation_manager import CirculationManagerController
from api.problem_details import CANNOT_DELETE_SHARED_LIST
from core.app_server import load_pagination_from_request
from core.celery.tasks.custom_list import update_custom_list
from core.external_search import ExternalSearchIndex
from core.feed.acquisition import OPDSAcquisitionFeed
from core.lane import Lane, WorkList
from core.model import (
Expand All @@ -39,12 +41,11 @@
)
from core.problem_details import INVALID_INPUT, METHOD_NOT_ALLOWED
from core.query.customlist import CustomListQueries
from core.util.log import LoggerMixin
from core.util.problem_detail import ProblemDetail, ProblemDetailException


class CustomListsController(
CirculationManagerController, AdminPermissionsControllerMixin
):
class CustomListsController(AdminPermissionsControllerMixin, LoggerMixin):
class CustomListSharePostResponse(BaseModel):
successes: int = 0
failures: int = 0
Expand All @@ -60,6 +61,13 @@ class CustomListPostRequest(BaseModel):
auto_update_query: dict | None = None
auto_update_facets: dict | None = None

def __init__(
self, _db: Session, search_engine: ExternalSearchIndex, annotator: Callable
) -> None:
self._db = _db
self.search_engine = search_engine
self.annotator = annotator

def _list_as_json(self, list: CustomList, is_owner=True) -> dict:
"""Transform a CustomList object into a response ready dict"""
collections = []
Expand Down Expand Up @@ -235,7 +243,10 @@ def _create_or_update_list(
and list.auto_update_enabled
and list.auto_update_status == CustomList.INIT
):
CustomListQueries.populate_query_pages(self._db, list, max_pages=1)
CustomListQueries.populate_query_pages(
self._db, self.search_engine, list, max_pages=1
)
update_custom_list.delay(list.id)
elif (
not is_new
and list.auto_update_enabled
Expand All @@ -249,6 +260,7 @@ def _create_or_update_list(
prev_query_dict = json.loads(previous_auto_update_query)
if prev_query_dict != auto_update_query:
list.auto_update_status = CustomList.REPOPULATE
update_custom_list.delay(list.id)
except json.JSONDecodeError:
# Do nothing if the previous query was not valid
pass
Expand Down Expand Up @@ -348,7 +360,7 @@ def custom_list(self, list_id: int) -> Response | dict | ProblemDetail | None:
worklist = WorkList()
worklist.initialize(library, customlists=[list])

annotator = self.manager.annotator(worklist)
annotator = self.annotator(worklist)
url_fn = self.url_for_custom_list(library, list)
feed = OPDSAcquisitionFeed.from_query(
query, self._db, list.name or "", url, pagination, url_fn, annotator
Expand Down
18 changes: 0 additions & 18 deletions bin/custom_list_update_new_entries

This file was deleted.

Empty file added core/celery/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions core/celery/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from abc import ABC, abstractmethod

from sqlalchemy.orm import Session, sessionmaker

from core.celery.session import SessionMixin
from core.util.log import LoggerMixin


class Job(LoggerMixin, SessionMixin, ABC):
"""
Base class for all our Celery jobs.

This class provides a few helper methods for our jobs to use, such as
a logger and a session context manager. This class is and should remain
runable outside the context of a Celery worker. That way we are able to
test our jobs fully outside the Celery worker.

This class purposefully does not open a SqlAlchemy session for the job,
preferring to let the job open and close the session as needed. This
allows a long-running job to open and close the session as needed rather
than keeping the session open for the entire duration of the job.

Because our default Celery configuration is setup to ack tasks after they
are completed, if a worker dies while processing a task, the task will be
requeued and run again. We need to keep this in mind when writing our jobs
to ensure that they are idempotent and can be run multiple times without
causing any issues.
"""

def __init__(self, session_maker: sessionmaker[Session]):
"""
Initialize the job with a session maker, when running in the context
of a Task, this will come directly from the Task.
"""
self._session_maker = session_maker

@property
def session_maker(self) -> sessionmaker[Session]:
"""
A session maker for the job to use when creating sessions.

This should generally be accessed via the `session` or `transaction`
context managers defined in `SessionMixin`.
"""
return self._session_maker

@abstractmethod
def run(self) -> None:
"""
Implement this method in your subclass to define the work that the job should do.
"""

...
40 changes: 40 additions & 0 deletions core/celery/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager

from sqlalchemy.orm import Session, sessionmaker


class SessionMixin(ABC):
@property
@abstractmethod
def session_maker(self) -> sessionmaker[Session]:
"""
The session maker to use when creating sessions. Generally this should be accessed
via the `session` or `transaction` context managers defined below.
"""

@contextmanager
def session(self) -> Generator[Session, None, None]:
"""
Starts a session and yields it to the caller. The session is closed
when the context manager exits.

See: https://docs.sqlalchemy.org/en/20/orm/session_basics.html#opening-and-closing-a-session
"""
with self.session_maker() as session:
yield session

@contextmanager
def transaction(self) -> Generator[Session, None, None]:
"""
Start a new transaction and yield a session to the caller. The transaction will be
committed when the context manager exits. If an exception is raised, the transaction
will be rolled back.

See: https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.sessionmaker.begin
"""
with self.session_maker.begin() as session:
yield session
79 changes: 79 additions & 0 deletions core/celery/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

import celery
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool

from core.celery.session import SessionMixin
from core.model import SessionManager
from core.service.container import Services, container_instance
from core.util.log import LoggerMixin


class Task(celery.Task, LoggerMixin, SessionMixin):
"""
Celery task implementation for Palace.

Our Celery app is configured to use this as the Task class implementation. This class
provides some glue to allow tasks to access the database and services from the dependency
injection container.

In order to access this class within a Celery task, you must use the `bind=True` parameter
when defining your task.
See: https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks

For example:
```
@shared_task(bind=True)
def my_task(task: Task) -> None:
...
```

This class follows the pattern suggested in the Celery documentation:
https://docs.celeryq.dev/en/stable/userguide/tasks.html#custom-task-classes

The `__init__` method is only called once per worker process, so we can safely create a session
maker and services container here and reuse them for the life of the worker process.
See: https://docs.celeryq.dev/en/stable/userguide/tasks.html#instantiation
"""

_session_maker = None

@property
def session_maker(self) -> sessionmaker[Session]:
"""
Get a new session for this worker process.

This should generally be accessed via the `session` or `transaction` context managers
defined in `SessionMixin`.

This is using a `NullPool` connection pool for workers DB connections. This means that DB
connections are opened on demand, so we won't have long-lived connections sitting idle,
which should reduce the load on our PG instances.

A null pool isn't exactly the trade-off I wanted to make here. What I would have liked is a
connection pool that disconnects idle connections after some defined timeout, so we can have
a connection pool, but when the worker is sitting idle the connections will eventually drop.

The `QueuePool` pool class that is the default SQLAlchemy connection pool unfortunately does
not offer this functionality. Instead, if we used the `QueuePool` pool class, we would have
it would keep a connection open for each worker process, even if the process was idle for some
time.

This isn't ideal in the beginning when we have low worker utilization, but might be okay once
we get all our tasks moved over. So we will need to evaluate what we want to do for connection
pooling as this rolls out.

TODO: Evaluate connection pooling strategy for Celery workers, once we have a better idea of
worker utilization in production.
"""
if self._session_maker is None:
engine = SessionManager.engine(poolclass=NullPool)
jonathangreen marked this conversation as resolved.
Show resolved Hide resolved
maker = sessionmaker(bind=engine)
SessionManager.setup_event_listener(maker)
self._session_maker = maker
return self._session_maker

@property
def services(self) -> Services:
return container_instance()
Empty file added core/celery/tasks/__init__.py
Empty file.
Loading