-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task queue using python celery + redis #1760
Conversation
6c0fc5d
to
84e2933
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1760 +/- ##
========================================
Coverage 90.27% 90.28%
========================================
Files 247 256 +9
Lines 28343 28596 +253
Branches 6466 6505 +39
========================================
+ Hits 25586 25817 +231
- Misses 1823 1842 +19
- Partials 934 937 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
52a7119
to
4e2f269
Compare
4e2f269
to
3b4f420
Compare
5953d6d
to
de36be0
Compare
core/celery/job.py
Outdated
from core.util.log import LoggerMixin | ||
|
||
|
||
class Job(LoggerMixin, ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is that this class is what will replace Script
for anything being run by a Celery worker. The Job
class can be run and tested outside of the Celery context, and will be called from the Celery task.
core/celery/job.py
Outdated
@contextmanager | ||
def transaction(self) -> Generator[Session, None, None]: | ||
with self._session_maker.begin() as session: | ||
yield session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of opening a long lived session over the course of an entire Job
the way Script
operates. I'm exposing a sessionmaker
, so we are able to open and close DB connections or transactions as the job demands. This should let us have much more targeted transactions when we are converting to Job.
from core.model import Collection | ||
|
||
|
||
class CollectionDeleteJob(Job): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple example Job
.
if task_file.stem == "__init__": | ||
continue | ||
module = f"core.celery.tasks.{task_file.stem}" | ||
importlib.import_module(module) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does some programmatic imports to make sure that all our tasks are imported when starting the worker processes. This assumes that all our tasks will live in the core.celery.tasks
package.
celery_app: Celery, | ||
celery_worker: WorkController, | ||
celery_pydantic_config: CeleryConfiguration, | ||
) -> Generator[CeleryFixture, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main fixture for Celery
that can be imported to be able to do a complete end to end run of a task inside a test. The task will be run in a worker thread.
24f8b31
to
a7c73f9
Compare
@jonathangreen : this is very slick. The comments in the PR and in the code are very helpful. I like how the handling of the task queue is completely transparent. I'll be able to abandon the deferredtasks table for the inventory reports more or less immediately and with little effort. |
112e64e
to
32e9885
Compare
059410f
to
d4c2205
Compare
I think all the work is in place to be able to roll this one out, so all it needs is a final review. |
d4c2205
to
b2dda94
Compare
@jonathangreen I know that @dbernstein is reviewing this one, but I wanted to advocate for a couple extra bits of documentation:
|
This facility doesn't exist, but its a good idea. I added a ticket for it here PP-1153, so we can work on adding that. I don't think having that is a blocker for this going in, given the couple tasks that this PR converts to use Celery but let me know if you disagree.
I think this is something we are going to work out over time. I'm sure we'll know a lot more after moving all the scripts over to use celery. The most valuable source of information I've found is the celery documentation: https://docs.celeryq.dev/en/stable/index.html but I already have that linked in the README. I did try to write some detailed inline comments about the Palace specific celery parts, with links out to the Celery documentation: circulation/core/celery/job.py Lines 11 to 30 in b2dda94
circulation/core/celery/task.py Lines 13 to 38 in b2dda94
Is there anything more specific you would like to see here? |
No, I didn't have anything specific in mind, other than achieving the things mentioned in those bullet points. I was mainly thinking of any insights that you've gleaned as you've done this work, especially those that would help/save time for others (like me 😅) coming along and working on, trying to test, and debugging them. Also, I guess, anything gotchas that surprised you or weren't straightforward from the official docs. Thanks! |
Despite my initial reluctance, I've been impressed with Celery so far. The docs are really good, and I haven't hit a lot of rough edges yet. The biggest issue I had was getting the worker to run on my mac, but the The workers don't automatically reload on code changes, that might be something nice to look at for development in the future. I saw a couple things in google about using the That hasn't been too bad, since the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathangreen : very impressive work. Including an example of a script conversion to the celery approach (ie the custom lists update script) is very helpful. It is encouraging to see that the process is fairly straightforward.
The PR looks good. I have no specific changes to request.
However I am wondering about one thing: What happens if the celery task queue processing slows down to the point where more tasks are going on than are coming off? In the case of the scheduled custom list updates, should the custom list update tasks not complete in the hour before the task scheduler runs again, will we see duplicate tasks going on the queue?
I don't see a problem with this as long as we can eventually vacate the queue. The autoscaling of the celery workers that you've set up should take care of this. But I'm guessing we'll need an alert to tell us if the task queues getting backed up. Tracking the size of the queues and having some visibility into the composition (broken down by task type) might be useful. You may have already called these wants/needs out - I may have overlooked it or forgotten - but I put them here in case you haven't.
Yes we will see duplicate tasks going into the queue in this case. This will likely pose a problem for larger scripts, so as we convert those we will have to be thinking about this case. The celery docs have a suggestion for using locks for this purpose: https://docs.celeryq.dev/en/stable/tutorials/task-cookbook.html The custom list updates process reasonably fast as is, so I thought it was a good first candidate to convert without having to immediately deal with this possibility.
Agreed we will need some visibility into the queues. I've got an epic going with additional work for task queues: PP-107. That is where I've been trying to capture the future work, since this PR is just the beginning of the work in getting things converted. I think PP-1150 in that epic will cover adding the monitoring we need. Any input / details on those additional tasks is appreciated though. I'm hoping to be able to pull PP-1150 into the next sprint to work on. |
Description
Prototype task queue for CM using Celery + Redis. It implements a two tasks
🚨 This is a work in progress, it can't be merged until more work is done:I believe all the pre-requisite work is now in place to deploy out to
tpp-dev
.high
queue or if we need separate workers to prioritize like this.This one is fairly large so it spans several jira tickets:
Motivation and Context
Some useful commands if you want to try running this locally:
Then you should be able to add a collection in the CM and delete it, and see the worker delete it in the background.
How Has This Been Tested?
Checklist