diff --git a/Cargo.toml b/Cargo.toml index f67df7f..35b610c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,7 +173,7 @@ tonic-build = { version = "0.12.3" } # Sidekiq # Todo: the default `rss-stats` feature has a dependency that currently can't be satisfied (memchr: ~2.3) -rusty-sidekiq = { version = "0.12.0", default-features = false } +rusty-sidekiq = { version = "0.13.0", default-features = false } # Testing insta = { version = "1.39.0", features = ["toml", "filters"] } diff --git a/src/config/service/worker/sidekiq/default.toml b/src/config/service/worker/sidekiq/default.toml index 360e484..ea49e9e 100644 --- a/src/config/service/worker/sidekiq/default.toml +++ b/src/config/service/worker/sidekiq/default.toml @@ -1,3 +1,6 @@ +[service.sidekiq] +balance-strategy = "round-robin" + [service.sidekiq.periodic] stale-cleanup = "auto-clean-stale" diff --git a/src/config/service/worker/sidekiq/mod.rs b/src/config/service/worker/sidekiq/mod.rs index 2cb2358..5f34e6f 100644 --- a/src/config/service/worker/sidekiq/mod.rs +++ b/src/config/service/worker/sidekiq/mod.rs @@ -1,6 +1,7 @@ use crate::service::worker::sidekiq::app_worker::AppWorkerConfig; use config::{FileFormat, FileSourceString}; use serde_derive::{Deserialize, Serialize}; +use std::collections::BTreeMap; use strum_macros::{EnumString, IntoStaticStr}; use url::Url; use validator::Validate; @@ -24,6 +25,17 @@ pub struct SidekiqServiceConfig { #[serde(default = "SidekiqServiceConfig::default_num_workers")] pub num_workers: u32, + /// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults + /// to [`BalanceStrategy::RoundRobin`]. + /// + /// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/)) + /// checks queues for jobs in the order the queues are provided. This means that if the first + /// queue in the list provided to [`Processor::new`] always has an item, the other queues + /// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] is provided + /// (configurable in this field) to ensure that no queue is starved indefinitely. + #[serde(default)] + pub balance_strategy: BalanceStrategy, + /// The names of the worker queues to handle. // Todo: Allow overriding this via CLI args? #[serde(default)] @@ -41,6 +53,12 @@ pub struct SidekiqServiceConfig { #[serde(default)] #[validate(nested)] pub app_worker: AppWorkerConfig, + + /// Queue-specific configurations. The queues specified in this field do not need to match + /// the list of queues listed in the `queues` field. + #[serde(default)] + #[validate(nested)] + pub queue_config: BTreeMap, } impl SidekiqServiceConfig { @@ -112,6 +130,59 @@ pub struct ConnectionPool { pub max_connections: Option, } +#[derive( + Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, EnumString, IntoStaticStr, +)] +#[serde(rename_all = "kebab-case")] +#[strum(serialize_all = "kebab-case")] +#[non_exhaustive] +pub enum BalanceStrategy { + /// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each + /// queue in the list to have an equal opportunity to have its jobs run. + #[default] + RoundRobin, + /// Do not modify the list of queues. Warning: This can lead to queue starvation! For example, + /// if the first queue in the list provided to [`Processor::new`] is heavily used and always + /// has a job available to run, then the jobs in the other queues will never be run. + None, +} + +impl From for sidekiq::BalanceStrategy { + fn from(value: BalanceStrategy) -> Self { + match value { + BalanceStrategy::RoundRobin => sidekiq::BalanceStrategy::RoundRobin, + BalanceStrategy::None => sidekiq::BalanceStrategy::None, + } + } +} + +#[derive(Debug, Default, Validate, Clone, Serialize, Deserialize)] +#[serde(default, rename_all = "kebab-case")] +#[non_exhaustive] +pub struct QueueConfig { + /// Similar to `SidekiqServiceConfig#num_workers`, except allows configuring the number of + /// additional workers to dedicate to a specific queue. If provided, `num_workers` additional + /// workers will be created for this specific queue. + pub num_workers: Option, +} + +impl From for sidekiq::QueueConfig { + fn from(value: QueueConfig) -> Self { + value + .num_workers + .iter() + .fold(Default::default(), |config, num_workers| { + config.num_workers(*num_workers as usize) + }) + } +} + +impl From<&QueueConfig> for sidekiq::QueueConfig { + fn from(value: &QueueConfig) -> Self { + value.clone().into() + } +} + #[cfg(test)] mod deserialize_tests { use super::*; @@ -181,6 +252,39 @@ mod deserialize_tests { stale-cleanup = "auto-clean-stale" "# )] + #[case( + r#" + num-workers = 1 + balance-strategy = "none" + [redis] + uri = "redis://localhost:6379" + [periodic] + stale-cleanup = "auto-clean-stale" + "# + )] + #[case( + r#" + num-workers = 1 + balance-strategy = "round-robin" + [redis] + uri = "redis://localhost:6379" + [periodic] + stale-cleanup = "auto-clean-stale" + "# + )] + #[case( + r#" + num-workers = 1 + [redis] + uri = "redis://localhost:6379" + [periodic] + stale-cleanup = "auto-clean-stale" + [queue-config] + "foo" = { num-workers = 10 } + [queue-config.bar] + num-workers = 100 + "# + )] #[cfg_attr(coverage_nightly, coverage(off))] fn sidekiq(_case: TestCase, #[case] config: &str) { let sidekiq: SidekiqServiceConfig = toml::from_str(config).unwrap(); diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_1.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_1.snap index 99b9de1..677147b 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_1.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_1.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = [] [redis] @@ -20,3 +21,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_2.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_2.snap index 99b9de1..677147b 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_2.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_2.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = [] [redis] @@ -20,3 +21,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_3.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_3.snap index 9f7e4d0..a18803e 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_3.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_3.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = ['foo'] [redis] @@ -20,3 +21,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_4.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_4.snap index c7e4f64..4df51a3 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_4.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_4.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = [] [redis] @@ -22,3 +23,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_5.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_5.snap index 7365a0d..d54636b 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_5.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_5.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = [] [redis] @@ -22,3 +23,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_6.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_6.snap index 99b9de1..677147b 100644 --- a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_6.snap +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_6.snap @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs expression: sidekiq --- num-workers = 1 +balance-strategy = 'round-robin' queues = [] [redis] @@ -20,3 +21,5 @@ max-retries = 5 timeout = true max-duration = 60 disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_7.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_7.snap new file mode 100644 index 0000000..1261bed --- /dev/null +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_7.snap @@ -0,0 +1,25 @@ +--- +source: src/config/service/worker/sidekiq/mod.rs +expression: sidekiq +--- +num-workers = 1 +balance-strategy = 'none' +queues = [] + +[redis] +uri = 'redis://[Sensitive]' + +[redis.enqueue-pool] + +[redis.fetch-pool] + +[periodic] +stale-cleanup = 'auto-clean-stale' + +[app-worker] +max-retries = 5 +timeout = true +max-duration = 60 +disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_8.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_8.snap new file mode 100644 index 0000000..677147b --- /dev/null +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_8.snap @@ -0,0 +1,25 @@ +--- +source: src/config/service/worker/sidekiq/mod.rs +expression: sidekiq +--- +num-workers = 1 +balance-strategy = 'round-robin' +queues = [] + +[redis] +uri = 'redis://[Sensitive]' + +[redis.enqueue-pool] + +[redis.fetch-pool] + +[periodic] +stale-cleanup = 'auto-clean-stale' + +[app-worker] +max-retries = 5 +timeout = true +max-duration = 60 +disable-argument-coercion = false + +[queue-config] diff --git a/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_9.snap b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_9.snap new file mode 100644 index 0000000..ec075ca --- /dev/null +++ b/src/config/service/worker/sidekiq/snapshots/roadster__config__service__worker__sidekiq__deserialize_tests__sidekiq@case_9.snap @@ -0,0 +1,28 @@ +--- +source: src/config/service/worker/sidekiq/mod.rs +expression: sidekiq +--- +num-workers = 1 +balance-strategy = 'round-robin' +queues = [] + +[redis] +uri = 'redis://[Sensitive]' + +[redis.enqueue-pool] + +[redis.fetch-pool] + +[periodic] +stale-cleanup = 'auto-clean-stale' + +[app-worker] +max-retries = 5 +timeout = true +max-duration = 60 +disable-argument-coercion = false +[queue-config.bar] +num-workers = 100 + +[queue-config.foo] +num-workers = 10 diff --git a/src/config/snapshots/roadster__config__tests__test.snap b/src/config/snapshots/roadster__config__tests__test.snap index d686e74..a7e46e1 100644 --- a/src/config/snapshots/roadster__config__tests__test.snap +++ b/src/config/snapshots/roadster__config__tests__test.snap @@ -1,7 +1,6 @@ --- source: src/config/mod.rs expression: config -snapshot_kind: text --- environment = 'test' @@ -139,6 +138,7 @@ port = 3001 [service.sidekiq] num-workers = 16 +balance-strategy = 'round-robin' queues = [] [service.sidekiq.redis] @@ -156,6 +156,8 @@ max-retries = 25 timeout = true max-duration = 60 disable-argument-coercion = false + +[service.sidekiq.queue-config] [auth.jwt] secret = 'secret-test' diff --git a/src/service/worker/sidekiq/builder.rs b/src/service/worker/sidekiq/builder.rs index 74229eb..ab0364c 100644 --- a/src/service/worker/sidekiq/builder.rs +++ b/src/service/worker/sidekiq/builder.rs @@ -118,21 +118,29 @@ where ); debug!("Sidekiq.rs queues: {queues:?}"); let processor = { - let num_workers = context + let config = context.config().service.sidekiq.custom.clone(); + let num_workers = config.num_workers.to_usize().ok_or_else(|| { + anyhow!( + "Unable to convert num_workers `{}` to usize", + context.config().service.sidekiq.custom.num_workers + ) + })?; + let processor_config: ProcessorConfig = Default::default(); + let processor_config = processor_config + .num_workers(num_workers) + .balance_strategy(config.balance_strategy.into()); + + let processor_config = context .config() .service .sidekiq .custom - .num_workers - .to_usize() - .ok_or_else(|| { - anyhow!( - "Unable to convert num_workers `{}` to usize", - context.config().service.sidekiq.custom.num_workers - ) - })?; - let processor_config: ProcessorConfig = Default::default(); - let processor_config = processor_config.num_workers(num_workers); + .queue_config + .iter() + .fold(processor_config, |processor_config, (queue, config)| { + processor_config.queue_config(queue.clone(), config.into()) + }); + let processor = sidekiq::Processor::new(redis_fetch.clone(), queues.clone()) .with_config(processor_config); Processor::new(processor) diff --git a/src/service/worker/sidekiq/service.rs b/src/service/worker/sidekiq/service.rs index a7e08e2..8a179a1 100644 --- a/src/service/worker/sidekiq/service.rs +++ b/src/service/worker/sidekiq/service.rs @@ -23,14 +23,26 @@ pub(crate) fn enabled(context: &AppContext) -> bool { debug!("Sidekiq is not enabled in the config."); return false; } - if sidekiq_config.custom.num_workers == 0 { + + let dedicated_workers: u64 = context + .config() + .service + .sidekiq + .custom + .queue_config + .values() + .map(|config| config.num_workers.unwrap_or_default() as u64) + .sum(); + if sidekiq_config.custom.num_workers == 0 && dedicated_workers == 0 { debug!("Sidekiq configured with 0 worker tasks."); return false; } - if sidekiq_config.custom.queues.is_empty() { + + if sidekiq_config.custom.queues.is_empty() && dedicated_workers == 0 { debug!("Sidekiq configured with 0 worker queues."); return false; } + if context.redis_fetch().is_none() { debug!("No 'redis-fetch' pool connections available."); return false;