Skip to content

Commit

Permalink
feat: Support sidekiq balance strategy and dedicated queues
Browse files Browse the repository at this point in the history
Add support for the `BalanceStrategy` config from
`rusty-sidekiq` and default to `BalanceStrategy#RoundRobin` to ensure
all queues have a chance to have their jobs run by default.

Also add support for dedicated queues via the
`service.sidekiq.queue-config` field. Queues listed in this field will
run with a dedicated set of workers (equal to the `num-workers`
sub-field for each queue).
  • Loading branch information
spencewenski committed Jan 13, 2025
1 parent 87020ff commit 2e175e0
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ tonic-build = { version = "0.12.3" }

# Sidekiq
# Todo: the default `rss-stats` feature has a dependency that currently can't be satisfied (memchr: ~2.3)
rusty-sidekiq = { version = "0.12.0", default-features = false }
rusty-sidekiq = { version = "0.13.0", default-features = false }

# Testing
insta = { version = "1.39.0", features = ["toml", "filters"] }
Expand Down
3 changes: 3 additions & 0 deletions src/config/service/worker/sidekiq/default.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[service.sidekiq]
balance-strategy = "round-robin"

[service.sidekiq.periodic]
stale-cleanup = "auto-clean-stale"

Expand Down
104 changes: 104 additions & 0 deletions src/config/service/worker/sidekiq/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::service::worker::sidekiq::app_worker::AppWorkerConfig;
use config::{FileFormat, FileSourceString};
use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;
use strum_macros::{EnumString, IntoStaticStr};
use url::Url;
use validator::Validate;
Expand All @@ -24,6 +25,17 @@ pub struct SidekiqServiceConfig {
#[serde(default = "SidekiqServiceConfig::default_num_workers")]
pub num_workers: u32,

/// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults
/// to [`BalanceStrategy::RoundRobin`].
///
/// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/))
/// checks queues for jobs in the order the queues are provided. This means that if the first
/// queue in the list provided to [`Processor::new`] always has an item, the other queues
/// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] is provided
/// (configurable in this field) to ensure that no queue is starved indefinitely.
#[serde(default)]
pub balance_strategy: BalanceStrategy,

/// The names of the worker queues to handle.
// Todo: Allow overriding this via CLI args?
#[serde(default)]
Expand All @@ -41,6 +53,12 @@ pub struct SidekiqServiceConfig {
#[serde(default)]
#[validate(nested)]
pub app_worker: AppWorkerConfig,

/// Queue-specific configurations. The queues specified in this field do not need to match
/// the list of queues listed in the `queues` field.
#[serde(default)]
#[validate(nested)]
pub queue_config: BTreeMap<String, QueueConfig>,
}

impl SidekiqServiceConfig {
Expand Down Expand Up @@ -112,6 +130,59 @@ pub struct ConnectionPool {
pub max_connections: Option<u32>,
}

#[derive(
Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, EnumString, IntoStaticStr,
)]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
#[non_exhaustive]
pub enum BalanceStrategy {
/// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each
/// queue in the list to have an equal opportunity to have its jobs run.
#[default]
RoundRobin,
/// Do not modify the list of queues. Warning: This can lead to queue starvation! For example,
/// if the first queue in the list provided to [`Processor::new`] is heavily used and always
/// has a job available to run, then the jobs in the other queues will never be run.
None,
}

impl From<BalanceStrategy> for sidekiq::BalanceStrategy {
fn from(value: BalanceStrategy) -> Self {
match value {
BalanceStrategy::RoundRobin => sidekiq::BalanceStrategy::RoundRobin,
BalanceStrategy::None => sidekiq::BalanceStrategy::None,
}
}
}

#[derive(Debug, Default, Validate, Clone, Serialize, Deserialize)]
#[serde(default, rename_all = "kebab-case")]
#[non_exhaustive]
pub struct QueueConfig {
/// Similar to `SidekiqServiceConfig#num_workers`, except allows configuring the number of
/// additional workers to dedicate to a specific queue. If provided, `num_workers` additional
/// workers will be created for this specific queue.
pub num_workers: Option<u32>,
}

impl From<QueueConfig> for sidekiq::QueueConfig {
fn from(value: QueueConfig) -> Self {
value
.num_workers
.iter()
.fold(Default::default(), |config, num_workers| {
config.num_workers(*num_workers as usize)
})
}
}

impl From<&QueueConfig> for sidekiq::QueueConfig {
fn from(value: &QueueConfig) -> Self {
value.clone().into()
}
}

#[cfg(test)]
mod deserialize_tests {
use super::*;
Expand Down Expand Up @@ -181,6 +252,39 @@ mod deserialize_tests {
stale-cleanup = "auto-clean-stale"
"#
)]
#[case(
r#"
num-workers = 1
balance-strategy = "none"
[redis]
uri = "redis://localhost:6379"
[periodic]
stale-cleanup = "auto-clean-stale"
"#
)]
#[case(
r#"
num-workers = 1
balance-strategy = "round-robin"
[redis]
uri = "redis://localhost:6379"
[periodic]
stale-cleanup = "auto-clean-stale"
"#
)]
#[case(
r#"
num-workers = 1
[redis]
uri = "redis://localhost:6379"
[periodic]
stale-cleanup = "auto-clean-stale"
[queue-config]
"foo" = { num-workers = 10 }
[queue-config.bar]
num-workers = 100
"#
)]
#[cfg_attr(coverage_nightly, coverage(off))]
fn sidekiq(_case: TestCase, #[case] config: &str) {
let sidekiq: SidekiqServiceConfig = toml::from_str(config).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
Expand All @@ -20,3 +21,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
Expand All @@ -20,3 +21,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = ['foo']

[redis]
Expand All @@ -20,3 +21,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
Expand All @@ -22,3 +23,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
Expand All @@ -22,3 +23,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
Expand All @@ -20,3 +21,5 @@ max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'none'
queues = []

[redis]
uri = 'redis://[Sensitive]'

[redis.enqueue-pool]

[redis.fetch-pool]

[periodic]
stale-cleanup = 'auto-clean-stale'

[app-worker]
max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
uri = 'redis://[Sensitive]'

[redis.enqueue-pool]

[redis.fetch-pool]

[periodic]
stale-cleanup = 'auto-clean-stale'

[app-worker]
max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false

[queue-config]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
source: src/config/service/worker/sidekiq/mod.rs
expression: sidekiq
---
num-workers = 1
balance-strategy = 'round-robin'
queues = []

[redis]
uri = 'redis://[Sensitive]'

[redis.enqueue-pool]

[redis.fetch-pool]

[periodic]
stale-cleanup = 'auto-clean-stale'

[app-worker]
max-retries = 5
timeout = true
max-duration = 60
disable-argument-coercion = false
[queue-config.bar]
num-workers = 100

[queue-config.foo]
num-workers = 10
4 changes: 3 additions & 1 deletion src/config/snapshots/roadster__config__tests__test.snap
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
---
source: src/config/mod.rs
expression: config
snapshot_kind: text
---
environment = 'test'

Expand Down Expand Up @@ -139,6 +138,7 @@ port = 3001

[service.sidekiq]
num-workers = 16
balance-strategy = 'round-robin'
queues = []

[service.sidekiq.redis]
Expand All @@ -156,6 +156,8 @@ max-retries = 25
timeout = true
max-duration = 60
disable-argument-coercion = false

[service.sidekiq.queue-config]
[auth.jwt]
secret = 'secret-test'

Expand Down
30 changes: 19 additions & 11 deletions src/service/worker/sidekiq/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,29 @@ where
);
debug!("Sidekiq.rs queues: {queues:?}");
let processor = {
let num_workers = context
let config = context.config().service.sidekiq.custom.clone();
let num_workers = config.num_workers.to_usize().ok_or_else(|| {
anyhow!(
"Unable to convert num_workers `{}` to usize",
context.config().service.sidekiq.custom.num_workers
)
})?;
let processor_config: ProcessorConfig = Default::default();
let processor_config = processor_config
.num_workers(num_workers)
.balance_strategy(config.balance_strategy.into());

let processor_config = context
.config()
.service
.sidekiq
.custom
.num_workers
.to_usize()
.ok_or_else(|| {
anyhow!(
"Unable to convert num_workers `{}` to usize",
context.config().service.sidekiq.custom.num_workers
)
})?;
let processor_config: ProcessorConfig = Default::default();
let processor_config = processor_config.num_workers(num_workers);
.queue_config
.iter()
.fold(processor_config, |processor_config, (queue, config)| {
processor_config.queue_config(queue.clone(), config.into())
});

let processor = sidekiq::Processor::new(redis_fetch.clone(), queues.clone())
.with_config(processor_config);
Processor::new(processor)
Expand Down
Loading

0 comments on commit 2e175e0

Please sign in to comment.