From ad54edc64c37a815200b2d59bf090a32ab91ed29 Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Sun, 14 Apr 2024 00:19:12 -0700 Subject: [PATCH] Enable registering periodic workers --- src/app.rs | 10 ++++++++-- src/config/worker.rs | 2 +- src/worker/registry.rs | 28 +++++++++++++++++++++++++++- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/app.rs b/src/app.rs index 6cc42c9d..e6a21823 100644 --- a/src/app.rs +++ b/src/app.rs @@ -228,7 +228,7 @@ where processor: Processor::new(redis, queues.clone()), state: state.clone(), }; - A::workers(&mut registry, &context, &state); + A::workers(&mut registry, &context, &state).await?; registry.processor }; let token = processor.get_cancellation_token(); @@ -376,7 +376,13 @@ pub trait App: Send + Sync { } #[cfg(feature = "sidekiq")] - fn workers(_registry: &mut WorkerRegistry, _context: &AppContext, _state: &Self::State) {} + async fn workers( + _registry: &mut WorkerRegistry, + _context: &AppContext, + _state: &Self::State, + ) -> anyhow::Result<()> { + Ok(()) + } async fn serve( router: Router, diff --git a/src/config/worker.rs b/src/config/worker.rs index 490662f2..d22546d4 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -25,6 +25,6 @@ pub struct Sidekiq { /// The default app worker config. Values can be overridden on a per-worker basis by /// implementing the corresponding [crate::worker::app_worker::AppWorker] methods. - #[serde(default)] + #[serde(default, flatten)] pub worker_config: AppWorkerConfig, } diff --git a/src/worker/registry.rs b/src/worker/registry.rs index b0bf01bf..4fc0c39b 100644 --- a/src/worker/registry.rs +++ b/src/worker/registry.rs @@ -2,7 +2,7 @@ use crate::app::App; use crate::worker::app_worker::AppWorker; use crate::worker::RoadsterWorker; use serde::Serialize; -use sidekiq::Processor; +use sidekiq::{periodic, Processor}; use std::sync::Arc; use tracing::debug; @@ -33,4 +33,30 @@ where let roadster_worker = RoadsterWorker::new(worker, self.state.clone()); self.processor.register(roadster_worker); } + + /// Register a periodic [worker][AppWorker] that will run with the provided args. The cadence + /// of the periodic worker, the worker's queue name, and other attributes are specified using + /// the [builder][periodic::Builder]. However, to help ensure type-safety the args are provided + /// to this method instead of the [builder][periodic::Builder]. + /// + /// The worker will be wrapped by a [RoadsterWorker], which provides some common behavior, such + /// as enforcing a timeout/max duration of worker jobs. + pub async fn register_periodic_app_worker( + &mut self, + builder: periodic::Builder, + worker: W, + args: Args, + ) -> anyhow::Result<()> + where + Args: Sync + Send + Serialize + for<'de> serde::Deserialize<'de> + 'static, + W: AppWorker + 'static, + { + debug!("Registering periodic worker: `{}`", W::class_name()); + let roadster_worker = RoadsterWorker::new(worker, self.state.clone()); + builder + .args(args)? + .register(&mut self.processor, roadster_worker) + .await?; + Ok(()) + } }