Skip to content

Commit

Permalink
Allow configuration for Celery Prefetch Multiplier
Browse files Browse the repository at this point in the history
- By default now is `2` (it was `4`)
- Clean some code in tasks
  • Loading branch information
Uxio0 committed Oct 30, 2024
1 parent 441d165 commit 155bacc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
4 changes: 3 additions & 1 deletion docker/web/celery/worker/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

set -euo pipefail

TASK_CONCURRENCY=${CELERYD_CONCURRENCY:-15000}
TASK_CONCURRENCY=${CELERYD_CONCURRENCY:-5000}
PREFETCH_MULTIPLIER=${CELERYD_PREFETCH_MULTIPLIER:-2} # Default is 4

# DEBUG set in .env_docker_compose
if [ ${DEBUG:-0} = 1 ]; then
Expand Down Expand Up @@ -30,6 +31,7 @@ exec celery --no-color -A config.celery_app worker \
--pool=gevent \
--loglevel $log_level \
--concurrency="${TASK_CONCURRENCY}" \
--prefetch-multiplier="${PREFETCH_MULTIPLIER}" \
--without-heartbeat \
--without-gossip \
--without-mingle -E -Q "$WORKER_QUEUES"
2 changes: 1 addition & 1 deletion safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ def process_decoded_internal_txs_task(self) -> Optional[int]:
safe_to_process
).not_processed().update(processed=True)
else:
count += 1
process_decoded_internal_txs_for_safe_task.delay(
safe_to_process, reindex_master_copies=True
)
count += 1

if not count:
logger.info("No Safes to process")
Expand Down
11 changes: 8 additions & 3 deletions safe_transaction_service/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ def shutdown_worker():
logger.warning("No redis locks to release")


def get_task_lock_name(task_name: str, lock_name_suffix: Optional[str] = None) -> str:
lock_name = f"locks:tasks:{task_name}"
if lock_name_suffix:
lock_name += f":{lock_name_suffix}"
return lock_name


@contextlib.contextmanager
def only_one_running_task(
task: CeleryTask,
Expand All @@ -74,9 +81,7 @@ def only_one_running_task(
if WORKER_STOPPED:
raise LockError("Worker is stopping")
redis = get_redis()
lock_name = f"locks:tasks:{task.name}"
if lock_name_suffix:
lock_name += f":{lock_name_suffix}"
lock_name = get_task_lock_name(task.name, lock_name_suffix=lock_name_suffix)
with redis.lock(lock_name, blocking=False, timeout=lock_timeout) as lock:
try:
ACTIVE_LOCKS.add(lock_name)
Expand Down

0 comments on commit 155bacc

Please sign in to comment.