From 857810a610cb4d252228d22ba27c2862f0691f13 Mon Sep 17 00:00:00 2001 From: "jacovdbergh@gmail.com" Date: Fri, 11 Oct 2024 14:09:28 +0200 Subject: [PATCH] add num workers config for sidekiq queue --- Cargo.toml | 2 +- src/bgworker/mod.rs | 1 + src/bgworker/skq.rs | 12 +++++++----- src/config.rs | 7 +++++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 708762a94..7535a16fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,7 +134,7 @@ sqlx = { version = "0.7", default-features = false, features = [ ulid = { version = "1", optional = true } # bg_redis: redis workers -rusty-sidekiq = { version = "0.8.2", default-features = false, optional = true } +rusty-sidekiq = { version = "0.11.0", default-features = false, optional = true } bb8 = { version = "0.8.1", optional = true } [workspace.dependencies] diff --git a/src/bgworker/mod.rs b/src/bgworker/mod.rs index 8a2727f89..93eb39e6c 100644 --- a/src/bgworker/mod.rs +++ b/src/bgworker/mod.rs @@ -268,6 +268,7 @@ pub async fn converge(queue: &Queue, config: &QueueConfig) -> Result<()> { dangerously_flush, uri: _, queues: _, + num_workers: _, }) => { if *dangerously_flush { queue.clear().await?; diff --git a/src/bgworker/skq.rs b/src/bgworker/skq.rs index 248137303..63c4d5abf 100644 --- a/src/bgworker/skq.rs +++ b/src/bgworker/skq.rs @@ -1,11 +1,10 @@ use std::{marker::PhantomData, sync::Arc}; -use async_trait::async_trait; -use bb8::Pool; -use sidekiq::{Processor, RedisConnectionManager}; - use super::{BackgroundWorker, Queue}; use crate::{config::RedisQueueConfig, Result}; +use async_trait::async_trait; +use bb8::Pool; +use sidekiq::{Processor, ProcessorConfig, RedisConnectionManager}; pub type RedisPool = Pool; pub struct SidekiqBackgroundWorker { @@ -99,7 +98,10 @@ pub async fn create_provider(qcfg: &RedisQueueConfig) -> Result { let queues = get_queues(&qcfg.queues); Ok(Queue::Redis( redis.clone(), - Arc::new(tokio::sync::Mutex::new(Processor::new(redis, queues))), + Arc::new(tokio::sync::Mutex::new( + Processor::new(redis, queues) + .with_config(ProcessorConfig::default().num_workers(qcfg.num_workers as usize)), + )), )) } diff --git a/src/config.rs b/src/config.rs index 110f20f70..81e489275 100644 --- a/src/config.rs +++ b/src/config.rs @@ -237,6 +237,9 @@ pub struct RedisQueueConfig { /// Custom queue names declaration. Useful to model priority queues. /// First queue in list is more important. pub queues: Option>, + + #[serde(default = "num_workers")] + pub num_workers: u32, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -264,7 +267,7 @@ pub struct PostgresQueueConfig { #[serde(default = "pgq_poll_interval")] pub poll_interval_sec: u32, - #[serde(default = "pgq_num_workers")] + #[serde(default = "num_workers")] pub num_workers: u32, } @@ -288,7 +291,7 @@ fn pgq_poll_interval() -> u32 { 1 } -fn pgq_num_workers() -> u32 { +fn num_workers() -> u32 { 2 }