Skip to content

Commit

Permalink
Add RoadsterWorker to provide common behaviors for workers
Browse files Browse the repository at this point in the history
- `RoadsterWorker` will auto-timeout workers after a certain
  configurable duration
- Also add `AppWorker`, and require consumers to implement it in order
  to register a worker. The `AppWorker` provides the configuration
  options, and also a convenience method for enqueuing jobs.
- Add `WorkerRegistry` to allow consumers to register workers that and
  wrap them with `RoadsterWorker`
  • Loading branch information
spencewenski committed Apr 12, 2024
1 parent 2107a62 commit a876641
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ chrono = { version = "0.4.35", features = ["serde"] }
byte-unit = { version = "5.1.4", features = ["serde"] }
convert_case = "0.6.0"
const_format = "0.2.32"
typed-builder = "0.18.1"

[dev-dependencies]
cargo-husky = { version = "1.5.0", default-features = false, features = ["user-hooks"] }
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ locally in a standalone docker container.
git clone https://github.com/roadster-rs/standalone_sidekiq_dashboard.git
cd standalone_sidekiq_dashboard
docker build -t standalone-sidekiq .
# Linux docker commands
# Development
docker run --network=host standalone-sidekiq
# Test
docker run --network=host -e REDIS_URL='redis://localhost:6380' standalone-sidekiq
# Mac docker commands -- todo: see if there's a command that will work on both mac and linux
# Development
docker run -p 9292:9292 -e REDIS_URL=redis://host.docker.internal:6379 standalone-sidekiq
# Test
docker run -p 9292:9292 -e REDIS_URL=redis://host.docker.internal:6380 standalone-sidekiq
```

## Redis Insights
Expand Down
29 changes: 18 additions & 11 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use crate::initializer::default::default_initializers;
use crate::initializer::Initializer;
use crate::tracing::init_tracing;
#[cfg(feature = "sidekiq")]
use crate::worker::queue_names;
use crate::worker::queues;
#[cfg(feature = "sidekiq")]
use crate::worker::registry::WorkerRegistry;

// todo: this method is getting unweildy, we should break it up
pub async fn start<A>(
Expand Down Expand Up @@ -210,20 +212,27 @@ where
.config
.worker
.sidekiq
.queue_names
.queues
.clone()
.into_iter()
.chain(A::worker_queues(&context, &state))
.collect();
let queue_names = queue_names(&custom_queue_names);
let queues = queues(&custom_queue_names);
info!(
"Creating Sidekiq.rs (rusty-sidekiq) processor with {} queues",
queue_names.len()
queues.len()
);
debug!("Sidekiq.rs queues: {queue_names:?}");
let mut processor = Processor::new(redis, queue_names);
A::workers(&mut processor, &context, &state);
debug!("Sidekiq.rs queues: {queues:?}");
let processor = {
let mut registry = WorkerRegistry {
processor: Processor::new(redis, queues.clone()),
state: state.clone(),
};
A::workers(&mut registry, &context, &state);
registry.processor
};
let token = processor.get_cancellation_token();

(processor, token.clone(), token.drop_guard())
};

Expand Down Expand Up @@ -363,13 +372,12 @@ pub trait App: Send + Sync {
/// instance. This can reduce the risk of copy/paste errors and typos.
#[cfg(feature = "sidekiq")]
fn worker_queues(_context: &AppContext, _state: &Self::State) -> Vec<String> {
vec![]
Default::default()
}

#[cfg(feature = "sidekiq")]
fn workers(_processor: &mut Processor, _context: &AppContext, _state: &Self::State) {}
fn workers(_registry: &mut WorkerRegistry<Self>, _context: &AppContext, _state: &Self::State) {}

#[instrument(skip_all)]
async fn serve<F>(
router: Router,
shutdown_signal: F,
Expand Down Expand Up @@ -408,7 +416,6 @@ pub trait App: Send + Sync {
}
}

#[instrument(skip_all)]
async fn graceful_shutdown_signal<F>(cancellation_token: CancellationToken, app_shutdown_signal: F)
where
F: Future<Output = ()> + Send + 'static,
Expand Down
23 changes: 4 additions & 19 deletions src/config/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use url::Url;
use crate::config::environment::{Environment, ENVIRONMENT_ENV_VAR_NAME};
use crate::config::initializer::Initializer;
use crate::config::middleware::Middleware;
#[cfg(feature = "sidekiq")]
use crate::config::worker::Worker;
use crate::util::serde_util::{default_true, UriOrString};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -52,6 +54,8 @@ impl AppConfig {
let environment_str: &str = environment.into();

let config: AppConfig = Config::builder()
// Todo: allow other file formats?
// Todo: allow splitting config into multiple files?
.add_source(config::File::with_name("config/default.toml"))
.add_source(config::File::with_name(&format!(
"config/{environment_str}.toml"
Expand Down Expand Up @@ -185,25 +189,6 @@ impl Database {
}
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Worker {
// Todo: Make Redis optional for workers?
#[cfg(feature = "sidekiq")]
pub sidekiq: Sidekiq,
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Sidekiq {
// Todo: Make Redis optional for workers?
pub redis: Redis,
#[serde(default)]
pub queue_names: Vec<String>,
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand Down
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ pub mod app_config;
pub mod environment;
pub mod initializer;
pub mod middleware;
#[cfg(feature = "sidekiq")]
pub mod worker;
30 changes: 30 additions & 0 deletions src/config/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::config::app_config::Redis;
use crate::worker::app_worker::AppWorkerConfig;
use serde_derive::{Deserialize, Serialize};

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Worker {
// Todo: Make Redis optional for workers?
#[cfg(feature = "sidekiq")]
pub sidekiq: Sidekiq,
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Sidekiq {
// Todo: Make Redis optional for workers?
pub redis: Redis,

/// The names of the worker queues to handle.
// Todo: Allow overriding this via CLI args?
#[serde(default)]
pub queues: Vec<String>,

/// The default app worker config. Values can be overridden on a per-worker basis by
/// implementing the corresponding [crate::worker::app_worker::AppWorker] methods.
#[serde(default)]
pub worker_config: AppWorkerConfig,
}
16 changes: 0 additions & 16 deletions src/worker.rs

This file was deleted.

156 changes: 156 additions & 0 deletions src/worker/app_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use crate::app::App;
use crate::app_context::AppContext;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, skip_serializing_none};
use sidekiq::Worker;
use std::sync::Arc;
use std::time::Duration;
use typed_builder::TypedBuilder;

/// Additional configuration options that can be configured via the app's configuration files.
/// The options can also be overridden on a per-worker basis by implementing the corresponding
/// method in the [AppWorker] trait.
#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
#[serde(default, rename_all = "kebab-case")]
pub struct AppWorkerConfig {
/// The maximum number of times a job should be retried on failure.
pub max_retries: usize,
/// True if Roadster should enforce a timeout on the app's workers. The default duration of
/// the timeout can be configured with the `max-duration` option.
pub timeout: bool,
/// The maximum duration workers should run for. The timeout is only enforced if `timeout`
/// is `true`.
#[serde_as(as = "serde_with::DurationSeconds")]
pub max_duration: Duration,
/// See <https://docs.rs/rusty-sidekiq/latest/sidekiq/trait.Worker.html#method.disable_argument_coercion>
pub disable_argument_coercion: bool,
}

impl Default for AppWorkerConfig {
fn default() -> Self {
AppWorkerConfig::builder()
.max_retries(5)
.timeout(true)
.max_duration(Duration::from_secs(60))
.disable_argument_coercion(false)
.build()
}
}

#[async_trait]
pub trait AppWorker<A, Args>: Worker<Args>
where
Self: Sized,
A: App,
Args: Send + Sync + serde::Serialize + 'static,
{
/// Build a new instance of the [worker][Self].
fn build(state: &A::State) -> Self;

/// Enqueue the worker into its Sidekiq queue. This is a helper method around [Worker::perform_async]
/// so the caller can simply provide the [state][App::State] instead of needing to access the
/// [sidekiq::RedisPool] from inside the [state][App::State].
async fn enqueue(state: &A::State, args: Args) -> anyhow::Result<()> {
let context: Arc<AppContext> = state.clone().into();
Self::perform_async(&context.redis, args).await?;
Ok(())
}

/// Provide the [AppWorkerConfig] for [Self]. The default implementation populates the
/// [AppWorkerConfig] using the values from the corresponding methods on [Self], e.g.,
/// [Self::max_retries].
fn config(&self, state: &A::State) -> AppWorkerConfig {
AppWorkerConfig::builder()
.max_retries(AppWorker::max_retries(self, state))
.timeout(self.timeout(state))
.max_duration(self.max_duration(state))
.disable_argument_coercion(AppWorker::disable_argument_coercion(self, state))
.build()
}

/// See [AppWorkerConfig::max_retries].
///
/// The default implementation uses the value from the app's config file.
fn max_retries(&self, state: &A::State) -> usize {
let context: Arc<AppContext> = state.clone().into();
context.config.worker.sidekiq.worker_config.max_retries
}

/// See [AppWorkerConfig::timeout].
///
/// The default implementation uses the value from the app's config file.
fn timeout(&self, state: &A::State) -> bool {
let context: Arc<AppContext> = state.clone().into();
context.config.worker.sidekiq.worker_config.timeout
}

/// See [AppWorkerConfig::max_duration].
///
/// The default implementation uses the value from the app's config file.
fn max_duration(&self, state: &A::State) -> Duration {
let context: Arc<AppContext> = state.clone().into();
context.config.worker.sidekiq.worker_config.max_duration
}

/// See [AppWorkerConfig::disable_argument_coercion].
///
/// The default implementation uses the value from the app's config file.
fn disable_argument_coercion(&self, state: &A::State) -> bool {
let context: Arc<AppContext> = state.clone().into();
context
.config
.worker
.sidekiq
.worker_config
.disable_argument_coercion
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::from_str;

#[derive(Debug, Deserialize, Serialize)]
struct Wrapper<T> {
inner: T,
}

#[test]
fn deserialize_config_override_max_retries() {
let max_retries = 1234;
let value: Wrapper<AppWorkerConfig> = from_str(&format!(
r#"{{"inner": {{"max-retries": {max_retries} }} }}"#
))
.unwrap();
assert_eq!(value.inner.max_retries, max_retries);
}

#[test]
fn deserialize_config_override_timeout() {
let value: Wrapper<AppWorkerConfig> =
from_str(r#"{"inner": {"timeout": false } }"#).unwrap();
assert!(!value.inner.timeout);
}

#[test]
fn deserialize_config_override_max_duration() {
let max_duration = Duration::from_secs(1234);
let value: Wrapper<AppWorkerConfig> = from_str(&format!(
r#"{{"inner": {{"max-duration": {} }} }}"#,
max_duration.as_secs()
))
.unwrap();
assert_eq!(value.inner.max_duration, max_duration);
}

#[test]
fn deserialize_config_override_disable_argument_coercion() {
let value: Wrapper<AppWorkerConfig> =
from_str(r#"{"inner": {"disable-argument-coercion": true } }"#).unwrap();
assert!(value.inner.disable_argument_coercion);
}
}
Loading

0 comments on commit a876641

Please sign in to comment.