Skip to content

Commit

Permalink
Allow multiple worker processes
Browse files Browse the repository at this point in the history
  • Loading branch information
medihack committed Feb 27, 2024
1 parent e5264fd commit c69b48a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 29 deletions.
4 changes: 2 additions & 2 deletions compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ services:
worker_default:
<<: *default-app
command: |
./manage.py celery_worker -Q default_queue --autoreload
./manage.py celery_worker -c 1 -Q default_queue --autoreload
profiles:
- full
- extra

worker_llm:
<<: *default-app
command: |
./manage.py celery_worker -Q llm_queue --autoreload
./manage.py celery_worker -c 1 -Q llm_queue --autoreload
profiles:
- full
- extra
Expand Down
2 changes: 1 addition & 1 deletion compose/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:

worker_llm:
<<: *default-app
command: ./manage.py celery_worker -Q llm_queue
command: ./manage.py celery_worker -c 1 -Q llm_queue
deploy:
replicas: 1

Expand Down
44 changes: 22 additions & 22 deletions notebooks/radis_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 25,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -19,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 26,
"execution_count": 5,
"metadata": {},
"outputs": [
{
Expand All @@ -32,7 +32,7 @@
{
"data": {
"text/plain": [
"{'id': 108,\n",
"{'id': 102,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -49,8 +49,8 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.255167+01:00',\n",
" 'created_at': '2024-02-27T23:20:34.337535+01:00',\n",
" 'updated_at': '2024-02-27T23:20:34.337542+01:00',\n",
" 'groups': [2]}"
]
},
Expand Down Expand Up @@ -91,13 +91,13 @@
},
{
"cell_type": "code",
"execution_count": 27,
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 108,\n",
"{'id': 102,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -114,12 +114,12 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n",
" 'created_at': '2024-02-27T23:20:34.337535+01:00',\n",
" 'updated_at': '2024-02-27T23:20:34.780964+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 27,
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -159,13 +159,13 @@
},
{
"cell_type": "code",
"execution_count": 28,
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 108,\n",
"{'id': 102,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -182,8 +182,8 @@
" 'sop_instance_uid': '35858-384834-3843',\n",
" 'study_instance_uid': '34343-34343-34343',\n",
" 'series_instance_uid': '34343-676556-3343'},\n",
" 'created_at': '2024-02-27T23:07:02.255160+01:00',\n",
" 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n",
" 'created_at': '2024-02-27T23:20:34.337535+01:00',\n",
" 'updated_at': '2024-02-27T23:20:34.780964+01:00',\n",
" 'groups': [2],\n",
" 'documents': {'vespa': {'pathId': '/document/v1/report/report/docid/gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'id': 'id:report:report::gepacs_3dfidii5858-6633i4-ii398841',\n",
Expand All @@ -201,7 +201,7 @@
" 'study_datetime': 965858400}}}}"
]
},
"execution_count": 28,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -218,7 +218,7 @@
},
{
"cell_type": "code",
"execution_count": 29,
"execution_count": 8,
"metadata": {},
"outputs": [
{
Expand All @@ -238,13 +238,13 @@
},
{
"cell_type": "code",
"execution_count": 30,
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 109,\n",
"{'id': 103,\n",
" 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'language': 'en',\n",
" 'pacs_aet': 'gepacs',\n",
Expand All @@ -261,12 +261,12 @@
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
" 'sop_instance_uid': '35858-384834-3843'},\n",
" 'created_at': '2024-02-27T23:07:04.039672+01:00',\n",
" 'updated_at': '2024-02-27T23:07:04.039679+01:00',\n",
" 'created_at': '2024-02-27T23:20:36.656856+01:00',\n",
" 'updated_at': '2024-02-27T23:20:36.656865+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 30,
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -306,7 +306,7 @@
},
{
"cell_type": "code",
"execution_count": 31,
"execution_count": 10,
"metadata": {},
"outputs": [
{
Expand Down
13 changes: 9 additions & 4 deletions radis/core/management/commands/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ def add_arguments(self, parser):
parser.add_argument(
"-c",
"--concurrency",
default=1,
help="Number of child processes processing the queue.",
type=int,
default=0,
help="Number of child processes processing the queue (defaults to number of CPUs).",
)

def run_server(self, **options):
queue = options["queue"]
loglevel = options["loglevel"]
concurrency = options["concurrency"]
hostname = f"worker_{queue}_{socket.gethostname()}"
cmd = f"celery -A radis worker -Q {queue} -l {loglevel} -c {concurrency} -n {hostname}"

cmd = f"celery -A radis worker -Q {queue} -l {loglevel} -n {hostname}"

concurrency = options["concurrency"]
if concurrency >= 1:
cmd += f" -c {concurrency}"

self.worker_process = subprocess.Popen(shlex.split(cmd))
self.worker_process.wait()
Expand Down

0 comments on commit c69b48a

Please sign in to comment.