Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task queue using python celery + redis #1760

Merged
merged 22 commits into from
Apr 15, 2024
Merged

Conversation

jonathangreen
Copy link
Member

@jonathangreen jonathangreen commented Apr 3, 2024

Description

Prototype task queue for CM using Celery + Redis. It implements a two tasks

  • delete a collection in the background when collection delete is selected in through the admin interface
  • populating customlists, both the scheduled maintenance task, and triggering a list to populate when changes are made in the admin UI.

🚨 This is a work in progress, it can't be merged until more work is done:
I believe all the pre-requisite work is now in place to deploy out to tpp-dev.

  • Celery workers running in scripts container
    • Daemonized
    • Logging
  • See if we are able to configure worker to preferentially choose a task from the high queue or if we need separate workers to prioritize like this.
  • Documentation added to README
    • Redis
    • New env vars
    • Celery
  • Tested multiple CM + single redis configuration
  • Hosting playbook updated
    • Deploy redis
    • New env vars set

This one is fairly large so it spans several jira tickets:

Motivation and Context

Some useful commands if you want to try running this locally:

# start redis
docker run --name redis -d redis -p 6379:6379
# setup env vars (assuming you have PG and OS containers already running and env vars set)
PALACE_CELERY_BROKER_URL="redis://localhost:6379"
# start CM
python app.py
# start celery worker
celery -A "core.celery.worker.app" worker --concurrency 1 --pool solo

Then you should be able to add a collection in the CM and delete it, and see the worker delete it in the background.

How Has This Been Tested?

  • Tested locally so far
  • Tested with unit tests

Checklist

  • I have updated the documentation accordingly.
  • All new and existing tests passed.

@jonathangreen jonathangreen force-pushed the feature/message-queue branch from 6c0fc5d to 84e2933 Compare April 3, 2024 17:13
Copy link

codecov bot commented Apr 3, 2024

Codecov Report

Attention: Patch coverage is 91.57088% with 22 lines in your changes are missing coverage. Please review.

Project coverage is 90.28%. Comparing base (6b22d7f) to head (b2dda94).

Files Patch % Lines
core/celery/worker.py 75.00% 7 Missing ⚠️
core/service/celery/celery.py 73.07% 7 Missing ⚠️
core/celery/tasks/custom_list.py 90.62% 3 Missing and 3 partials ⚠️
core/service/logging/log.py 66.66% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main    #1760    +/-   ##
========================================
  Coverage   90.27%   90.28%            
========================================
  Files         247      256     +9     
  Lines       28343    28596   +253     
  Branches     6466     6505    +39     
========================================
+ Hits        25586    25817   +231     
- Misses       1823     1842    +19     
- Partials      934      937     +3     
Flag Coverage Δ
Api 75.59% <54.02%> (-0.21%) ⬇️
Core 59.72% <85.05%> (+0.23%) ⬆️
migration 24.00% <17.62%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jonathangreen jonathangreen force-pushed the feature/message-queue branch 2 times, most recently from 52a7119 to 4e2f269 Compare April 4, 2024 23:33
@jonathangreen jonathangreen changed the base branch from main to bugfix/logging April 4, 2024 23:34
@jonathangreen jonathangreen force-pushed the feature/message-queue branch from 4e2f269 to 3b4f420 Compare April 4, 2024 23:56
Base automatically changed from bugfix/logging to main April 5, 2024 13:36
@jonathangreen jonathangreen force-pushed the feature/message-queue branch 2 times, most recently from 5953d6d to de36be0 Compare April 5, 2024 13:55
from core.util.log import LoggerMixin


class Job(LoggerMixin, ABC):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that this class is what will replace Script for anything being run by a Celery worker. The Job class can be run and tested outside of the Celery context, and will be called from the Celery task.

@contextmanager
def transaction(self) -> Generator[Session, None, None]:
with self._session_maker.begin() as session:
yield session
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of opening a long lived session over the course of an entire Job the way Script operates. I'm exposing a sessionmaker, so we are able to open and close DB connections or transactions as the job demands. This should let us have much more targeted transactions when we are converting to Job.

core/celery/task.py Outdated Show resolved Hide resolved
core/celery/task.py Show resolved Hide resolved
from core.model import Collection


class CollectionDeleteJob(Job):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple example Job.

if task_file.stem == "__init__":
continue
module = f"core.celery.tasks.{task_file.stem}"
importlib.import_module(module)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does some programmatic imports to make sure that all our tasks are imported when starting the worker processes. This assumes that all our tasks will live in the core.celery.tasks package.

core/service/celery/celery.py Outdated Show resolved Hide resolved
core/service/celery/celery.py Outdated Show resolved Hide resolved
core/service/celery/celery.py Show resolved Hide resolved
celery_app: Celery,
celery_worker: WorkController,
celery_pydantic_config: CeleryConfiguration,
) -> Generator[CeleryFixture, None, None]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fixture for Celery that can be imported to be able to do a complete end to end run of a task inside a test. The task will be run in a worker thread.

@jonathangreen jonathangreen force-pushed the feature/message-queue branch from 24f8b31 to a7c73f9 Compare April 5, 2024 15:20
@jonathangreen jonathangreen requested a review from a team April 5, 2024 15:28
@dbernstein
Copy link
Contributor

@jonathangreen : this is very slick. The comments in the PR and in the code are very helpful. I like how the handling of the task queue is completely transparent. I'll be able to abandon the deferredtasks table for the inventory reports more or less immediately and with little effort.

@jonathangreen jonathangreen force-pushed the feature/message-queue branch 2 times, most recently from 112e64e to 32e9885 Compare April 10, 2024 16:07
@jonathangreen jonathangreen force-pushed the feature/message-queue branch from 059410f to d4c2205 Compare April 11, 2024 16:43
@jonathangreen jonathangreen marked this pull request as ready for review April 11, 2024 19:25
@jonathangreen jonathangreen requested review from dbernstein and a team April 11, 2024 19:25
@jonathangreen
Copy link
Member Author

I think all the work is in place to be able to roll this one out, so all it needs is a final review.

@jonathangreen jonathangreen force-pushed the feature/message-queue branch from d4c2205 to b2dda94 Compare April 11, 2024 23:43
@tdilauro
Copy link
Contributor

@jonathangreen I know that @dbernstein is reviewing this one, but I wanted to advocate for a couple extra bits of documentation:

  • It would be useful to add some details on how we would manually kick off tasks that we are currently able to start by hopping onto a scripts server and running from the command line.
  • Some guidance on (or pointers to same) on developing and debugging celery tasks.

@jonathangreen
Copy link
Member Author

It would be useful to add some details on how we would manually kick off tasks that we are currently able to start by hopping onto a scripts server and running from the command line.

This facility doesn't exist, but its a good idea. I added a ticket for it here PP-1153, so we can work on adding that. I don't think having that is a blocker for this going in, given the couple tasks that this PR converts to use Celery but let me know if you disagree.

Some guidance on (or pointers to same) on developing and debugging celery tasks.

I think this is something we are going to work out over time. I'm sure we'll know a lot more after moving all the scripts over to use celery. The most valuable source of information I've found is the celery documentation: https://docs.celeryq.dev/en/stable/index.html but I already have that linked in the README.

I did try to write some detailed inline comments about the Palace specific celery parts, with links out to the Celery documentation:

class Job(LoggerMixin, SessionMixin, ABC):
"""
Base class for all our Celery jobs.
This class provides a few helper methods for our jobs to use, such as
a logger and a session context manager. This class is and should remain
runable outside the context of a Celery worker. That way we are able to
test our jobs fully outside the Celery worker.
This class purposefully does not open a SqlAlchemy session for the job,
preferring to let the job open and close the session as needed. This
allows a long-running job to open and close the session as needed rather
than keeping the session open for the entire duration of the job.
Because our default Celery configuration is setup to ack tasks after they
are completed, if a worker dies while processing a task, the task will be
requeued and run again. We need to keep this in mind when writing our jobs
to ensure that they are idempotent and can be run multiple times without
causing any issues.
"""

class Task(celery.Task, LoggerMixin, SessionMixin):
"""
Celery task implementation for Palace.
Our Celery app is configured to use this as the Task class implementation. This class
provides some glue to allow tasks to access the database and services from the dependency
injection container.
In order to access this class within a Celery task, you must use the `bind=True` parameter
when defining your task.
See: https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks
For example:
```
@shared_task(bind=True)
def my_task(task: Task) -> None:
...
```
This class follows the pattern suggested in the Celery documentation:
https://docs.celeryq.dev/en/stable/userguide/tasks.html#custom-task-classes
The `__init__` method is only called once per worker process, so we can safely create a session
maker and services container here and reuse them for the life of the worker process.
See: https://docs.celeryq.dev/en/stable/userguide/tasks.html#instantiation
"""

Is there anything more specific you would like to see here?

@tdilauro
Copy link
Contributor

No, I didn't have anything specific in mind, other than achieving the things mentioned in those bullet points. I was mainly thinking of any insights that you've gleaned as you've done this work, especially those that would help/save time for others (like me 😅) coming along and working on, trying to test, and debugging them. Also, I guess, anything gotchas that surprised you or weren't straightforward from the official docs. Thanks!

@jonathangreen
Copy link
Member Author

jonathangreen commented Apr 12, 2024

Despite my initial reluctance, I've been impressed with Celery so far. The docs are really good, and I haven't hit a lot of rough edges yet. The biggest issue I had was getting the worker to run on my mac, but the celery -A "core.celery.worker.app" worker --concurrency 1 --pool solo command I put in the docs gets a worker running locally.

The workers don't automatically reload on code changes, that might be something nice to look at for development in the future. I saw a couple things in google about using the watch package to accomplish this, but I haven't tried it yet. I've just been restarting the worker when I need it to pick up changes.

That hasn't been too bad, since the Job class is easily testable. I've just been doing the conversion of the existing code via Job getting tests running, then doing a bit of testing with the worker. The two scripts I converted though were fairly simple ones. I'm sure we'll learn a lot converting one of the importers, since its a much more complicated operation.

Copy link
Contributor

@dbernstein dbernstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonathangreen : very impressive work. Including an example of a script conversion to the celery approach (ie the custom lists update script) is very helpful. It is encouraging to see that the process is fairly straightforward.

The PR looks good. I have no specific changes to request.

However I am wondering about one thing: What happens if the celery task queue processing slows down to the point where more tasks are going on than are coming off? In the case of the scheduled custom list updates, should the custom list update tasks not complete in the hour before the task scheduler runs again, will we see duplicate tasks going on the queue?

I don't see a problem with this as long as we can eventually vacate the queue. The autoscaling of the celery workers that you've set up should take care of this. But I'm guessing we'll need an alert to tell us if the task queues getting backed up. Tracking the size of the queues and having some visibility into the composition (broken down by task type) might be useful. You may have already called these wants/needs out - I may have overlooked it or forgotten - but I put them here in case you haven't.

@jonathangreen
Copy link
Member Author

However I am wondering about one thing: What happens if the celery task queue processing slows down to the point where more tasks are going on than are coming off? In the case of the scheduled custom list updates, should the custom list update tasks not complete in the hour before the task scheduler runs again, will we see duplicate tasks going on the queue?

Yes we will see duplicate tasks going into the queue in this case. This will likely pose a problem for larger scripts, so as we convert those we will have to be thinking about this case. The celery docs have a suggestion for using locks for this purpose: https://docs.celeryq.dev/en/stable/tutorials/task-cookbook.html

The custom list updates process reasonably fast as is, so I thought it was a good first candidate to convert without having to immediately deal with this possibility.

I don't see a problem with this as long as we can eventually vacate the queue. The autoscaling of the celery workers that you've set up should take care of this. But I'm guessing we'll need an alert to tell us if the task queues getting backed up. Tracking the size of the queues and having some visibility into the composition (broken down by task type) might be useful. You may have already called these wants/needs out - I may have overlooked it or forgotten - but I put them here in case you haven't.

Agreed we will need some visibility into the queues. I've got an epic going with additional work for task queues: PP-107. That is where I've been trying to capture the future work, since this PR is just the beginning of the work in getting things converted. I think PP-1150 in that epic will cover adding the monitoring we need. Any input / details on those additional tasks is appreciated though. I'm hoping to be able to pull PP-1150 into the next sprint to work on.

@jonathangreen jonathangreen merged commit 1522135 into main Apr 15, 2024
24 checks passed
@jonathangreen jonathangreen deleted the feature/message-queue branch April 15, 2024 19:00
@jonathangreen jonathangreen added the feature New feature label Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants