diff --git a/Cargo.toml b/Cargo.toml index 085df3d7..e886722e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ chrono = { version = "0.4.35", features = ["serde"] } byte-unit = { version = "5.1.4", features = ["serde"] } convert_case = "0.6.0" const_format = "0.2.32" +typed-builder = "0.18.1" [dev-dependencies] cargo-husky = { version = "1.5.0", default-features = false, features = ["user-hooks"] } diff --git a/README.md b/README.md index 38682c73..c8ecea4b 100644 --- a/README.md +++ b/README.md @@ -115,10 +115,17 @@ locally in a standalone docker container. git clone https://github.com/roadster-rs/standalone_sidekiq_dashboard.git cd standalone_sidekiq_dashboard docker build -t standalone-sidekiq . +# Linux docker commands # Development docker run --network=host standalone-sidekiq # Test docker run --network=host -e REDIS_URL='redis://localhost:6380' standalone-sidekiq + +# Mac docker commands -- todo: see if there's a command that will work on both mac and linux +# Development +docker run -p 9292:9292 -e REDIS_URL=redis://host.docker.internal:6379 standalone-sidekiq +# Test +docker run -p 9292:9292 -e REDIS_URL=redis://host.docker.internal:6380 standalone-sidekiq ``` ## Redis Insights diff --git a/src/app.rs b/src/app.rs index ebfd4f26..6cc42c9d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -39,7 +39,9 @@ use crate::initializer::default::default_initializers; use crate::initializer::Initializer; use crate::tracing::init_tracing; #[cfg(feature = "sidekiq")] -use crate::worker::queue_names; +use crate::worker::queues; +#[cfg(feature = "sidekiq")] +use crate::worker::registry::WorkerRegistry; // todo: this method is getting unweildy, we should break it up pub async fn start( @@ -210,20 +212,27 @@ where .config .worker .sidekiq - .queue_names + .queues .clone() .into_iter() .chain(A::worker_queues(&context, &state)) .collect(); - let queue_names = queue_names(&custom_queue_names); + let queues = queues(&custom_queue_names); info!( "Creating Sidekiq.rs (rusty-sidekiq) processor with {} queues", - queue_names.len() + queues.len() ); - debug!("Sidekiq.rs queues: {queue_names:?}"); - let mut processor = Processor::new(redis, queue_names); - A::workers(&mut processor, &context, &state); + debug!("Sidekiq.rs queues: {queues:?}"); + let processor = { + let mut registry = WorkerRegistry { + processor: Processor::new(redis, queues.clone()), + state: state.clone(), + }; + A::workers(&mut registry, &context, &state); + registry.processor + }; let token = processor.get_cancellation_token(); + (processor, token.clone(), token.drop_guard()) }; @@ -363,13 +372,12 @@ pub trait App: Send + Sync { /// instance. This can reduce the risk of copy/paste errors and typos. #[cfg(feature = "sidekiq")] fn worker_queues(_context: &AppContext, _state: &Self::State) -> Vec { - vec![] + Default::default() } #[cfg(feature = "sidekiq")] - fn workers(_processor: &mut Processor, _context: &AppContext, _state: &Self::State) {} + fn workers(_registry: &mut WorkerRegistry, _context: &AppContext, _state: &Self::State) {} - #[instrument(skip_all)] async fn serve( router: Router, shutdown_signal: F, @@ -408,7 +416,6 @@ pub trait App: Send + Sync { } } -#[instrument(skip_all)] async fn graceful_shutdown_signal(cancellation_token: CancellationToken, app_shutdown_signal: F) where F: Future + Send + 'static, diff --git a/src/config/app_config.rs b/src/config/app_config.rs index 8e5822ec..5be344b4 100644 --- a/src/config/app_config.rs +++ b/src/config/app_config.rs @@ -13,6 +13,8 @@ use url::Url; use crate::config::environment::{Environment, ENVIRONMENT_ENV_VAR_NAME}; use crate::config::initializer::Initializer; use crate::config::middleware::Middleware; +#[cfg(feature = "sidekiq")] +use crate::config::worker::Worker; use crate::util::serde_util::{default_true, UriOrString}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -52,6 +54,8 @@ impl AppConfig { let environment_str: &str = environment.into(); let config: AppConfig = Config::builder() + // Todo: allow other file formats? + // Todo: allow splitting config into multiple files? .add_source(config::File::with_name("config/default.toml")) .add_source(config::File::with_name(&format!( "config/{environment_str}.toml" @@ -185,25 +189,6 @@ impl Database { } } -#[cfg(feature = "sidekiq")] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct Worker { - // Todo: Make Redis optional for workers? - #[cfg(feature = "sidekiq")] - pub sidekiq: Sidekiq, -} - -#[cfg(feature = "sidekiq")] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct Sidekiq { - // Todo: Make Redis optional for workers? - pub redis: Redis, - #[serde(default)] - pub queue_names: Vec, -} - #[cfg(feature = "sidekiq")] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] diff --git a/src/config/mod.rs b/src/config/mod.rs index 95335545..0cf0f777 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -2,3 +2,5 @@ pub mod app_config; pub mod environment; pub mod initializer; pub mod middleware; +#[cfg(feature = "sidekiq")] +pub mod worker; diff --git a/src/config/worker.rs b/src/config/worker.rs new file mode 100644 index 00000000..490662f2 --- /dev/null +++ b/src/config/worker.rs @@ -0,0 +1,30 @@ +use crate::config::app_config::Redis; +use crate::worker::app_worker::AppWorkerConfig; +use serde_derive::{Deserialize, Serialize}; + +#[cfg(feature = "sidekiq")] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct Worker { + // Todo: Make Redis optional for workers? + #[cfg(feature = "sidekiq")] + pub sidekiq: Sidekiq, +} + +#[cfg(feature = "sidekiq")] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct Sidekiq { + // Todo: Make Redis optional for workers? + pub redis: Redis, + + /// The names of the worker queues to handle. + // Todo: Allow overriding this via CLI args? + #[serde(default)] + pub queues: Vec, + + /// The default app worker config. Values can be overridden on a per-worker basis by + /// implementing the corresponding [crate::worker::app_worker::AppWorker] methods. + #[serde(default)] + pub worker_config: AppWorkerConfig, +} diff --git a/src/worker.rs b/src/worker.rs deleted file mode 100644 index 47e22726..00000000 --- a/src/worker.rs +++ /dev/null @@ -1,16 +0,0 @@ -use itertools::Itertools; -use lazy_static::lazy_static; - -lazy_static! { - pub static ref DEFAULT_QUEUE_NAMES: Vec = - ["default"].iter().map(|s| s.to_string()).collect(); -} - -pub fn queue_names(custom_queue_names: &Vec) -> Vec { - DEFAULT_QUEUE_NAMES - .iter() - .chain(custom_queue_names) - .unique() - .map(|s| s.to_owned()) - .collect() -} diff --git a/src/worker/app_worker.rs b/src/worker/app_worker.rs new file mode 100644 index 00000000..30de2b33 --- /dev/null +++ b/src/worker/app_worker.rs @@ -0,0 +1,156 @@ +use crate::app::App; +use crate::app_context::AppContext; +use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; +use serde_with::{serde_as, skip_serializing_none}; +use sidekiq::Worker; +use std::sync::Arc; +use std::time::Duration; +use typed_builder::TypedBuilder; + +/// Additional configuration options that can be configured via the app's configuration files. +/// The options can also be overridden on a per-worker basis by implementing the corresponding +/// method in the [AppWorker] trait. +#[serde_as] +#[skip_serializing_none] +#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] +#[serde(default, rename_all = "kebab-case")] +pub struct AppWorkerConfig { + /// The maximum number of times a job should be retried on failure. + pub max_retries: usize, + /// True if Roadster should enforce a timeout on the app's workers. The default duration of + /// the timeout can be configured with the `max-duration` option. + pub timeout: bool, + /// The maximum duration workers should run for. The timeout is only enforced if `timeout` + /// is `true`. + #[serde_as(as = "serde_with::DurationSeconds")] + pub max_duration: Duration, + /// See + pub disable_argument_coercion: bool, +} + +impl Default for AppWorkerConfig { + fn default() -> Self { + AppWorkerConfig::builder() + .max_retries(5) + .timeout(true) + .max_duration(Duration::from_secs(60)) + .disable_argument_coercion(false) + .build() + } +} + +#[async_trait] +pub trait AppWorker: Worker +where + Self: Sized, + A: App, + Args: Send + Sync + serde::Serialize + 'static, +{ + /// Build a new instance of the [worker][Self]. + fn build(state: &A::State) -> Self; + + /// Enqueue the worker into its Sidekiq queue. This is a helper method around [Worker::perform_async] + /// so the caller can simply provide the [state][App::State] instead of needing to access the + /// [sidekiq::RedisPool] from inside the [state][App::State]. + async fn enqueue(state: &A::State, args: Args) -> anyhow::Result<()> { + let context: Arc = state.clone().into(); + Self::perform_async(&context.redis, args).await?; + Ok(()) + } + + /// Provide the [AppWorkerConfig] for [Self]. The default implementation populates the + /// [AppWorkerConfig] using the values from the corresponding methods on [Self], e.g., + /// [Self::max_retries]. + fn config(&self, state: &A::State) -> AppWorkerConfig { + AppWorkerConfig::builder() + .max_retries(AppWorker::max_retries(self, state)) + .timeout(self.timeout(state)) + .max_duration(self.max_duration(state)) + .disable_argument_coercion(AppWorker::disable_argument_coercion(self, state)) + .build() + } + + /// See [AppWorkerConfig::max_retries]. + /// + /// The default implementation uses the value from the app's config file. + fn max_retries(&self, state: &A::State) -> usize { + let context: Arc = state.clone().into(); + context.config.worker.sidekiq.worker_config.max_retries + } + + /// See [AppWorkerConfig::timeout]. + /// + /// The default implementation uses the value from the app's config file. + fn timeout(&self, state: &A::State) -> bool { + let context: Arc = state.clone().into(); + context.config.worker.sidekiq.worker_config.timeout + } + + /// See [AppWorkerConfig::max_duration]. + /// + /// The default implementation uses the value from the app's config file. + fn max_duration(&self, state: &A::State) -> Duration { + let context: Arc = state.clone().into(); + context.config.worker.sidekiq.worker_config.max_duration + } + + /// See [AppWorkerConfig::disable_argument_coercion]. + /// + /// The default implementation uses the value from the app's config file. + fn disable_argument_coercion(&self, state: &A::State) -> bool { + let context: Arc = state.clone().into(); + context + .config + .worker + .sidekiq + .worker_config + .disable_argument_coercion + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::from_str; + + #[derive(Debug, Deserialize, Serialize)] + struct Wrapper { + inner: T, + } + + #[test] + fn deserialize_config_override_max_retries() { + let max_retries = 1234; + let value: Wrapper = from_str(&format!( + r#"{{"inner": {{"max-retries": {max_retries} }} }}"# + )) + .unwrap(); + assert_eq!(value.inner.max_retries, max_retries); + } + + #[test] + fn deserialize_config_override_timeout() { + let value: Wrapper = + from_str(r#"{"inner": {"timeout": false } }"#).unwrap(); + assert!(!value.inner.timeout); + } + + #[test] + fn deserialize_config_override_max_duration() { + let max_duration = Duration::from_secs(1234); + let value: Wrapper = from_str(&format!( + r#"{{"inner": {{"max-duration": {} }} }}"#, + max_duration.as_secs() + )) + .unwrap(); + assert_eq!(value.inner.max_duration, max_duration); + } + + #[test] + fn deserialize_config_override_disable_argument_coercion() { + let value: Wrapper = + from_str(r#"{"inner": {"disable-argument-coercion": true } }"#).unwrap(); + assert!(value.inner.disable_argument_coercion); + } +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs new file mode 100644 index 00000000..8f3b1dc4 --- /dev/null +++ b/src/worker/mod.rs @@ -0,0 +1,144 @@ +pub mod app_worker; +pub mod registry; + +use crate::app::App; + +use crate::worker::app_worker::AppWorkerConfig; +use app_worker::AppWorker; +use async_trait::async_trait; +use itertools::Itertools; +use lazy_static::lazy_static; +use serde::Serialize; + +use sidekiq::{RedisPool, Worker, WorkerOpts}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; +use tracing::{error, instrument}; + +lazy_static! { + // Todo: We don't need to specifically provide the `default` queue. However, if no queues + // are provided, Sidekiq.rs will error out. If no queues are provided, we should probably + // just not initialize the Sidekiq.rs processor instead of providing a `default` queue, + // which may not be what the consumer wants. + pub static ref DEFAULT_QUEUE_NAMES: Vec = + ["default"].iter().map(|s| s.to_string()).collect(); +} + +pub fn queues(custom_queue_names: &Vec) -> Vec { + DEFAULT_QUEUE_NAMES + .iter() + .chain(custom_queue_names) + .unique() + .map(|s| s.to_owned()) + .collect() +} + +/// Worker used by Roadster to wrap the consuming app's workers to add additional behavior. For +/// example, [RoadsterWorker] is by default configured to automatically abort the app's worker +/// when it exceeds a certain timeout. +pub(crate) struct RoadsterWorker +where + A: App, + Args: Send + Sync + Serialize + 'static, + W: AppWorker, +{ + inner: W, + inner_config: AppWorkerConfig, + _args: PhantomData, + _app: PhantomData, +} + +impl RoadsterWorker +where + A: App, + Args: Send + Sync + Serialize, + W: AppWorker, +{ + pub(crate) fn new(inner: W, state: Arc) -> Self { + let config = inner.config(&state); + Self { + inner, + inner_config: config, + _args: PhantomData, + _app: PhantomData, + } + } +} + +#[async_trait] +impl Worker for RoadsterWorker +where + A: App, + Args: Send + Sync + Serialize, + W: AppWorker, +{ + fn disable_argument_coercion(&self) -> bool { + self.inner_config.disable_argument_coercion + } + + fn opts() -> WorkerOpts + where + Self: Sized, + { + // This method not implemented because `RoadsterWorker` should not be enqueued directly, + // and this method is only used when enqueuing. Instead, Sidekiq.rs will use the + // `W::opts` implementation directly. + unimplemented!() + } + + fn max_retries(&self) -> usize { + self.inner_config.max_retries + } + + fn class_name() -> String + where + Self: Sized, + { + // This method is implemented because it's used both when registering the worker, and + // when enqueuing a job. We forward the implementation to `W::classname` because that's + // what Sidekiq.rs uses specifically. If we attempt to override this, our impl will be used + // when registering the worker, but not when enqueuing a job, so the worker will not pick + // up the jobs. + W::class_name() + } + + async fn perform_async(_redis: &RedisPool, _args: Args) -> sidekiq::Result<()> + where + Self: Sized, + Args: Send + Sync + Serialize + 'static, + { + // This method not implemented because `RoadsterWorker` should not be enqueued directly. + unimplemented!() + } + + async fn perform_in(_redis: &RedisPool, _duration: Duration, _args: Args) -> sidekiq::Result<()> + where + Self: Sized, + Args: Send + Sync + Serialize + 'static, + { + // This method not implemented because `RoadsterWorker` should not be enqueued directly. + unimplemented!() + } + + #[instrument(skip_all)] + async fn perform(&self, args: Args) -> sidekiq::Result<()> { + let inner = self.inner.perform(args); + + if self.inner_config.timeout { + tokio::time::timeout(self.inner_config.max_duration, inner) + .await + .map_err(|err| { + error!( + worker = %W::class_name(), + max_duration = %self.inner_config.max_duration.as_secs(), + %err, + "Worker timed out" + ); + sidekiq::Error::Any(Box::new(err)) + })? + } else { + inner.await + } + } +} diff --git a/src/worker/registry.rs b/src/worker/registry.rs new file mode 100644 index 00000000..b0bf01bf --- /dev/null +++ b/src/worker/registry.rs @@ -0,0 +1,36 @@ +use crate::app::App; +use crate::worker::app_worker::AppWorker; +use crate::worker::RoadsterWorker; +use serde::Serialize; +use sidekiq::Processor; +use std::sync::Arc; +use tracing::debug; + +/// Custom wrapper around [Processor] to help with registering [workers][AppWorker] that are +/// wrapped by [RoadsterWorker]. +pub struct WorkerRegistry +where + A: App + ?Sized, +{ + pub(crate) processor: Processor, + pub(crate) state: Arc, +} + +impl WorkerRegistry +where + A: App + 'static, +{ + /// Register a [worker][AppWorker] to handle Sidekiq.rs jobs. + /// + /// The worker will be wrapped by a [RoadsterWorker], which provides some common behavior, such + /// as enforcing a timeout/max duration of worker jobs. + pub fn register_app_worker(&mut self, worker: W) + where + Args: Sync + Send + Serialize + for<'de> serde::Deserialize<'de> + 'static, + W: AppWorker + 'static, + { + debug!("Registering worker: `{}`", W::class_name()); + let roadster_worker = RoadsterWorker::new(worker, self.state.clone()); + self.processor.register(roadster_worker); + } +}