From c4f486bafb3eb6ea2449a566332357f011a0d393 Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:32:40 -0700 Subject: [PATCH] Allow configuring the number of sidekiq worker tasks --- Cargo.toml | 3 ++- src/app.rs | 32 +++++++++++++++++++++++++++----- src/config/app_config.rs | 22 ---------------------- src/config/worker.rs | 35 ++++++++++++++++++++++++++++++++++- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8c5d544..253ad8ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ sea-orm-migration = { version = "1.0.0-rc.2", features = ["runtime-tokio-rustls" # Workers # Todo: the default `rss-stats` feature has a dependency that currently can't be satisfied (memchr: ~2.3) -rusty-sidekiq = { version = "0.10.2", default-features = false, optional = true } +rusty-sidekiq = { version = "0.10.3", default-features = false, optional = true } bb8 = { version = "0.8.3", optional = true } num_cpus = { version = "1.16.0", optional = true } @@ -81,6 +81,7 @@ byte-unit = { version = "5.1.4", features = ["serde"] } convert_case = "0.6.0" const_format = "0.2.32" typed-builder = "0.18.1" +num-traits = "0.2.18" [dev-dependencies] cargo-husky = { version = "1.5.0", default-features = false, features = ["user-hooks"] } diff --git a/src/app.rs b/src/app.rs index e76e2806..38e0988d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -8,6 +8,8 @@ use aide::axum::ApiRouter; use aide::openapi::OpenApi; #[cfg(feature = "open-api")] use aide::transform::TransformOpenApi; +#[cfg(feature = "sidekiq")] +use anyhow::anyhow; use async_trait::async_trait; #[cfg(feature = "open-api")] use axum::Extension; @@ -15,12 +17,14 @@ use axum::Router; #[cfg(feature = "cli")] use clap::{Args, Command, FromArgMatches}; use itertools::Itertools; +#[cfg(feature = "sidekiq")] +use num_traits::ToPrimitive; #[cfg(feature = "db-sql")] use sea_orm::{ConnectOptions, Database}; #[cfg(feature = "db-sql")] use sea_orm_migration::MigratorTrait; #[cfg(feature = "sidekiq")] -use sidekiq::{periodic, Processor}; +use sidekiq::{periodic, Processor, ProcessorConfig}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; #[cfg(feature = "sidekiq")] @@ -108,11 +112,15 @@ where #[cfg(feature = "sidekiq")] let redis = { - let redis_config = &config.worker.sidekiq.redis; + let sidekiq_config = &config.worker.sidekiq; + let redis_config = &sidekiq_config.redis; let redis = sidekiq::RedisConnectionManager::new(redis_config.uri.to_string())?; + let max_conns = redis_config + .max_connections + .unwrap_or(sidekiq_config.num_workers + 5); bb8::Pool::builder() .min_idle(redis_config.min_idle) - .max_size(redis_config.max_connections) + .max_size(max_conns) .build(redis) .await? }; @@ -230,8 +238,22 @@ where ); debug!("Sidekiq.rs queues: {queues:?}"); let processor = { - let mut registry = - WorkerRegistry::new(Processor::new(redis, queues.clone()), state.clone()); + let num_workers = context + .config + .worker + .sidekiq + .num_workers + .to_usize() + .ok_or_else(|| { + anyhow!( + "Unable to convert num_workers `{}` to usize", + context.config.worker.sidekiq.num_workers + ) + })?; + let processor_config: ProcessorConfig = Default::default(); + let processor_config = processor_config.num_workers(num_workers); + let processor = Processor::new(redis, queues.clone()).with_config(processor_config); + let mut registry = WorkerRegistry::new(processor, state.clone()); A::workers(&mut registry, &context, &state).await?; registry.remove_stale_periodic_jobs(&context).await?; registry.processor diff --git a/src/config/app_config.rs b/src/config/app_config.rs index 5be344b4..7611b946 100644 --- a/src/config/app_config.rs +++ b/src/config/app_config.rs @@ -188,25 +188,3 @@ impl Database { Duration::from_millis(1000) } } - -#[cfg(feature = "sidekiq")] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct Redis { - pub uri: Url, - #[serde(default = "Redis::default_min_idle")] - pub min_idle: Option, - #[serde(default = "Redis::default_max_connections")] - pub max_connections: u32, -} - -#[cfg(feature = "sidekiq")] -impl Redis { - fn default_min_idle() -> Option { - Some(5) - } - - fn default_max_connections() -> u32 { - (num_cpus::get() + 5) as u32 - } -} diff --git a/src/config/worker.rs b/src/config/worker.rs index bfe099f1..4e13ae3c 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -1,7 +1,7 @@ -use crate::config::app_config::Redis; use crate::worker::app_worker::AppWorkerConfig; use serde_derive::{Deserialize, Serialize}; use strum_macros::{EnumString, IntoStaticStr}; +use url::Url; #[cfg(feature = "sidekiq")] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -19,6 +19,17 @@ pub struct Sidekiq { // Todo: Make Redis optional for workers? pub redis: Redis, + /// The number of Sidekiq workers that can run at the same time. Adjust as needed based on + /// your workload and resource (cpu/memory/etc) usage. + /// + /// If your workload is largely CPU-bound (computationally expensive), this should probably + /// match your CPU count. This is the default if not provided. + /// + /// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and + /// waiting for responses, etc), this can probably be quite a bit higher than your CPU count. + #[serde(default = "Sidekiq::default_num_workers")] + pub num_workers: u32, + /// The names of the worker queues to handle. // Todo: Allow overriding this via CLI args? #[serde(default)] @@ -33,6 +44,12 @@ pub struct Sidekiq { pub worker_config: AppWorkerConfig, } +impl Sidekiq { + fn default_num_workers() -> u32 { + num_cpus::get() as u32 + } +} + #[cfg(feature = "sidekiq")] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -56,3 +73,19 @@ pub enum StaleCleanUpBehavior { AutoCleanAll, AutoCleanStale, } + +#[cfg(feature = "sidekiq")] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct Redis { + pub uri: Url, + #[serde(default)] + pub min_idle: Option, + /// The maximum number of Redis connections to allow. If not specified, will default to + /// [worker.sidekiq.num-workers][crate::config::worker::Sidekiq], plus a small amount to + /// allow other things to access Redis as needed, for example, a health check endpoint. + // Todo: Is it okay if this is equal to or smaller than the number of workers, or does each + // worker task consume a connection? + #[serde(default)] + pub max_connections: Option, +}