From c69b48a7d2ff157f1d4678b3a192f8ab62024363 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 27 Feb 2024 22:46:10 +0000 Subject: [PATCH] Allow multiple worker processes --- compose/docker-compose.dev.yml | 4 +- compose/docker-compose.prod.yml | 2 +- notebooks/radis_api.ipynb | 44 +++++++++---------- .../core/management/commands/celery_worker.py | 13 ++++-- 4 files changed, 34 insertions(+), 29 deletions(-) diff --git a/compose/docker-compose.dev.yml b/compose/docker-compose.dev.yml index 301349bf..1a659457 100644 --- a/compose/docker-compose.dev.yml +++ b/compose/docker-compose.dev.yml @@ -41,7 +41,7 @@ services: worker_default: <<: *default-app command: | - ./manage.py celery_worker -Q default_queue --autoreload + ./manage.py celery_worker -c 1 -Q default_queue --autoreload profiles: - full - extra @@ -49,7 +49,7 @@ services: worker_llm: <<: *default-app command: | - ./manage.py celery_worker -Q llm_queue --autoreload + ./manage.py celery_worker -c 1 -Q llm_queue --autoreload profiles: - full - extra diff --git a/compose/docker-compose.prod.yml b/compose/docker-compose.prod.yml index 860a0715..a54b7436 100644 --- a/compose/docker-compose.prod.yml +++ b/compose/docker-compose.prod.yml @@ -42,7 +42,7 @@ services: worker_llm: <<: *default-app - command: ./manage.py celery_worker -Q llm_queue + command: ./manage.py celery_worker -c 1 -Q llm_queue deploy: replicas: 1 diff --git a/notebooks/radis_api.ipynb b/notebooks/radis_api.ipynb index 96a598b1..5efffdaa 100644 --- a/notebooks/radis_api.ipynb +++ b/notebooks/radis_api.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 25, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -19,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": 5, "metadata": {}, "outputs": [ { @@ -32,7 +32,7 @@ { "data": { "text/plain": [ - "{'id': 108,\n", + "{'id': 102,\n", " 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n", " 'language': 'en',\n", " 'pacs_aet': 'gepacs',\n", @@ -49,8 +49,8 @@ " 'accession_number': '345348389',\n", " 'series_instance_uid': '34343-676556-3343',\n", " 'sop_instance_uid': '35858-384834-3843'},\n", - " 'created_at': '2024-02-27T23:07:02.255160+01:00',\n", - " 'updated_at': '2024-02-27T23:07:02.255167+01:00',\n", + " 'created_at': '2024-02-27T23:20:34.337535+01:00',\n", + " 'updated_at': '2024-02-27T23:20:34.337542+01:00',\n", " 'groups': [2]}" ] }, @@ -91,13 +91,13 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'id': 108,\n", + "{'id': 102,\n", " 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n", " 'language': 'en',\n", " 'pacs_aet': 'gepacs',\n", @@ -114,12 +114,12 @@ " 'accession_number': '345348389',\n", " 'series_instance_uid': '34343-676556-3343',\n", " 'sop_instance_uid': '35858-384834-3843'},\n", - " 'created_at': '2024-02-27T23:07:02.255160+01:00',\n", - " 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n", + " 'created_at': '2024-02-27T23:20:34.337535+01:00',\n", + " 'updated_at': '2024-02-27T23:20:34.780964+01:00',\n", " 'groups': [2]}" ] }, - "execution_count": 27, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" } @@ -159,13 +159,13 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'id': 108,\n", + "{'id': 102,\n", " 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n", " 'language': 'en',\n", " 'pacs_aet': 'gepacs',\n", @@ -182,8 +182,8 @@ " 'sop_instance_uid': '35858-384834-3843',\n", " 'study_instance_uid': '34343-34343-34343',\n", " 'series_instance_uid': '34343-676556-3343'},\n", - " 'created_at': '2024-02-27T23:07:02.255160+01:00',\n", - " 'updated_at': '2024-02-27T23:07:02.691615+01:00',\n", + " 'created_at': '2024-02-27T23:20:34.337535+01:00',\n", + " 'updated_at': '2024-02-27T23:20:34.780964+01:00',\n", " 'groups': [2],\n", " 'documents': {'vespa': {'pathId': '/document/v1/report/report/docid/gepacs_3dfidii5858-6633i4-ii398841',\n", " 'id': 'id:report:report::gepacs_3dfidii5858-6633i4-ii398841',\n", @@ -201,7 +201,7 @@ " 'study_datetime': 965858400}}}}" ] }, - "execution_count": 28, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } @@ -218,7 +218,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -238,13 +238,13 @@ }, { "cell_type": "code", - "execution_count": 30, + "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'id': 109,\n", + "{'id': 103,\n", " 'document_id': 'gepacs_3dfidii5858-6633i4-ii398841',\n", " 'language': 'en',\n", " 'pacs_aet': 'gepacs',\n", @@ -261,12 +261,12 @@ " 'accession_number': '345348389',\n", " 'series_instance_uid': '34343-676556-3343',\n", " 'sop_instance_uid': '35858-384834-3843'},\n", - " 'created_at': '2024-02-27T23:07:04.039672+01:00',\n", - " 'updated_at': '2024-02-27T23:07:04.039679+01:00',\n", + " 'created_at': '2024-02-27T23:20:36.656856+01:00',\n", + " 'updated_at': '2024-02-27T23:20:36.656865+01:00',\n", " 'groups': [2]}" ] }, - "execution_count": 30, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -306,7 +306,7 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 10, "metadata": {}, "outputs": [ { diff --git a/radis/core/management/commands/celery_worker.py b/radis/core/management/commands/celery_worker.py index 5fb76c43..df03ef21 100644 --- a/radis/core/management/commands/celery_worker.py +++ b/radis/core/management/commands/celery_worker.py @@ -36,16 +36,21 @@ def add_arguments(self, parser): parser.add_argument( "-c", "--concurrency", - default=1, - help="Number of child processes processing the queue.", + type=int, + default=0, + help="Number of child processes processing the queue (defaults to number of CPUs).", ) def run_server(self, **options): queue = options["queue"] loglevel = options["loglevel"] - concurrency = options["concurrency"] hostname = f"worker_{queue}_{socket.gethostname()}" - cmd = f"celery -A radis worker -Q {queue} -l {loglevel} -c {concurrency} -n {hostname}" + + cmd = f"celery -A radis worker -Q {queue} -l {loglevel} -n {hostname}" + + concurrency = options["concurrency"] + if concurrency >= 1: + cmd += f" -c {concurrency}" self.worker_process = subprocess.Popen(shlex.split(cmd)) self.worker_process.wait()