diff --git a/docker/web/celery/worker/run.sh b/docker/web/celery/worker/run.sh index 79b53d9a1..72e6d0a7e 100755 --- a/docker/web/celery/worker/run.sh +++ b/docker/web/celery/worker/run.sh @@ -2,7 +2,8 @@ set -euo pipefail -TASK_CONCURRENCY=${CELERYD_CONCURRENCY:-15000} +TASK_CONCURRENCY=${CELERYD_CONCURRENCY:-5000} +PREFETCH_MULTIPLIER=${CELERYD_PREFETCH_MULTIPLIER:-2} # Default is 4 # DEBUG set in .env_docker_compose if [ ${DEBUG:-0} = 1 ]; then @@ -30,6 +31,7 @@ exec celery --no-color -A config.celery_app worker \ --pool=gevent \ --loglevel $log_level \ --concurrency="${TASK_CONCURRENCY}" \ + --prefetch-multiplier="${PREFETCH_MULTIPLIER}" \ --without-heartbeat \ --without-gossip \ --without-mingle -E -Q "$WORKER_QUEUES" diff --git a/safe_transaction_service/history/tasks.py b/safe_transaction_service/history/tasks.py index 9181afd28..ecfea2cf8 100644 --- a/safe_transaction_service/history/tasks.py +++ b/safe_transaction_service/history/tasks.py @@ -258,10 +258,10 @@ def process_decoded_internal_txs_task(self) -> Optional[int]: safe_to_process ).not_processed().update(processed=True) else: - count += 1 process_decoded_internal_txs_for_safe_task.delay( safe_to_process, reindex_master_copies=True ) + count += 1 if not count: logger.info("No Safes to process") diff --git a/safe_transaction_service/utils/tasks.py b/safe_transaction_service/utils/tasks.py index df88d1ea9..811dd6ea1 100644 --- a/safe_transaction_service/utils/tasks.py +++ b/safe_transaction_service/utils/tasks.py @@ -52,6 +52,13 @@ def shutdown_worker(): logger.warning("No redis locks to release") +def get_task_lock_name(task_name: str, lock_name_suffix: Optional[str] = None) -> str: + lock_name = f"locks:tasks:{task_name}" + if lock_name_suffix: + lock_name += f":{lock_name_suffix}" + return lock_name + + @contextlib.contextmanager def only_one_running_task( task: CeleryTask, @@ -74,9 +81,7 @@ def only_one_running_task( if WORKER_STOPPED: raise LockError("Worker is stopping") redis = get_redis() - lock_name = f"locks:tasks:{task.name}" - if lock_name_suffix: - lock_name += f":{lock_name_suffix}" + lock_name = get_task_lock_name(task.name, lock_name_suffix=lock_name_suffix) with redis.lock(lock_name, blocking=False, timeout=lock_timeout) as lock: try: ACTIVE_LOCKS.add(lock_name)