Skip to content

Commit

Permalink
Allow configuring the number of sidekiq worker tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
spencewenski committed Apr 24, 2024
1 parent 721ad12 commit c4f486b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 29 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sea-orm-migration = { version = "1.0.0-rc.2", features = ["runtime-tokio-rustls"

# Workers
# Todo: the default `rss-stats` feature has a dependency that currently can't be satisfied (memchr: ~2.3)
rusty-sidekiq = { version = "0.10.2", default-features = false, optional = true }
rusty-sidekiq = { version = "0.10.3", default-features = false, optional = true }
bb8 = { version = "0.8.3", optional = true }
num_cpus = { version = "1.16.0", optional = true }

Expand Down Expand Up @@ -81,6 +81,7 @@ byte-unit = { version = "5.1.4", features = ["serde"] }
convert_case = "0.6.0"
const_format = "0.2.32"
typed-builder = "0.18.1"
num-traits = "0.2.18"

[dev-dependencies]
cargo-husky = { version = "1.5.0", default-features = false, features = ["user-hooks"] }
Expand Down
32 changes: 27 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ use aide::axum::ApiRouter;
use aide::openapi::OpenApi;
#[cfg(feature = "open-api")]
use aide::transform::TransformOpenApi;
#[cfg(feature = "sidekiq")]
use anyhow::anyhow;
use async_trait::async_trait;
#[cfg(feature = "open-api")]
use axum::Extension;
use axum::Router;
#[cfg(feature = "cli")]
use clap::{Args, Command, FromArgMatches};
use itertools::Itertools;
#[cfg(feature = "sidekiq")]
use num_traits::ToPrimitive;
#[cfg(feature = "db-sql")]
use sea_orm::{ConnectOptions, Database};
#[cfg(feature = "db-sql")]
use sea_orm_migration::MigratorTrait;
#[cfg(feature = "sidekiq")]
use sidekiq::{periodic, Processor};
use sidekiq::{periodic, Processor, ProcessorConfig};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "sidekiq")]
Expand Down Expand Up @@ -108,11 +112,15 @@ where

#[cfg(feature = "sidekiq")]
let redis = {
let redis_config = &config.worker.sidekiq.redis;
let sidekiq_config = &config.worker.sidekiq;
let redis_config = &sidekiq_config.redis;
let redis = sidekiq::RedisConnectionManager::new(redis_config.uri.to_string())?;
let max_conns = redis_config
.max_connections
.unwrap_or(sidekiq_config.num_workers + 5);
bb8::Pool::builder()
.min_idle(redis_config.min_idle)
.max_size(redis_config.max_connections)
.max_size(max_conns)
.build(redis)
.await?
};
Expand Down Expand Up @@ -230,8 +238,22 @@ where
);
debug!("Sidekiq.rs queues: {queues:?}");
let processor = {
let mut registry =
WorkerRegistry::new(Processor::new(redis, queues.clone()), state.clone());
let num_workers = context
.config
.worker
.sidekiq
.num_workers
.to_usize()
.ok_or_else(|| {
anyhow!(
"Unable to convert num_workers `{}` to usize",
context.config.worker.sidekiq.num_workers
)
})?;
let processor_config: ProcessorConfig = Default::default();
let processor_config = processor_config.num_workers(num_workers);
let processor = Processor::new(redis, queues.clone()).with_config(processor_config);
let mut registry = WorkerRegistry::new(processor, state.clone());
A::workers(&mut registry, &context, &state).await?;
registry.remove_stale_periodic_jobs(&context).await?;
registry.processor
Expand Down
22 changes: 0 additions & 22 deletions src/config/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,25 +188,3 @@ impl Database {
Duration::from_millis(1000)
}
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Redis {
pub uri: Url,
#[serde(default = "Redis::default_min_idle")]
pub min_idle: Option<u32>,
#[serde(default = "Redis::default_max_connections")]
pub max_connections: u32,
}

#[cfg(feature = "sidekiq")]
impl Redis {
fn default_min_idle() -> Option<u32> {
Some(5)
}

fn default_max_connections() -> u32 {
(num_cpus::get() + 5) as u32
}
}
35 changes: 34 additions & 1 deletion src/config/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::app_config::Redis;
use crate::worker::app_worker::AppWorkerConfig;
use serde_derive::{Deserialize, Serialize};
use strum_macros::{EnumString, IntoStaticStr};
use url::Url;

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -19,6 +19,17 @@ pub struct Sidekiq {
// Todo: Make Redis optional for workers?
pub redis: Redis,

/// The number of Sidekiq workers that can run at the same time. Adjust as needed based on
/// your workload and resource (cpu/memory/etc) usage.
///
/// If your workload is largely CPU-bound (computationally expensive), this should probably
/// match your CPU count. This is the default if not provided.
///
/// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and
/// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
#[serde(default = "Sidekiq::default_num_workers")]
pub num_workers: u32,

/// The names of the worker queues to handle.
// Todo: Allow overriding this via CLI args?
#[serde(default)]
Expand All @@ -33,6 +44,12 @@ pub struct Sidekiq {
pub worker_config: AppWorkerConfig,
}

impl Sidekiq {
fn default_num_workers() -> u32 {
num_cpus::get() as u32
}
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -56,3 +73,19 @@ pub enum StaleCleanUpBehavior {
AutoCleanAll,
AutoCleanStale,
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Redis {
pub uri: Url,
#[serde(default)]
pub min_idle: Option<u32>,
/// The maximum number of Redis connections to allow. If not specified, will default to
/// [worker.sidekiq.num-workers][crate::config::worker::Sidekiq], plus a small amount to
/// allow other things to access Redis as needed, for example, a health check endpoint.
// Todo: Is it okay if this is equal to or smaller than the number of workers, or does each
// worker task consume a connection?
#[serde(default)]
pub max_connections: Option<u32>,
}

0 comments on commit c4f486b

Please sign in to comment.