Skip to content

Commit

Permalink
Enable registering periodic workers
Browse files Browse the repository at this point in the history
  • Loading branch information
spencewenski committed Apr 14, 2024
1 parent a84a2b1 commit bca26d0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
10 changes: 8 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ where
processor: Processor::new(redis, queues.clone()),
state: state.clone(),
};
A::workers(&mut registry, &context, &state);
A::workers(&mut registry, &context, &state).await?;
registry.processor
};
let token = processor.get_cancellation_token();
Expand Down Expand Up @@ -376,7 +376,13 @@ pub trait App: Send + Sync {
}

#[cfg(feature = "sidekiq")]
fn workers(_registry: &mut WorkerRegistry<Self>, _context: &AppContext, _state: &Self::State) {}
async fn workers(
_registry: &mut WorkerRegistry<Self>,
_context: &AppContext,
_state: &Self::State,
) -> anyhow::Result<()> {
Ok(())
}

async fn serve<F>(
router: Router,
Expand Down
2 changes: 1 addition & 1 deletion src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ pub struct Sidekiq {

/// The default app worker config. Values can be overridden on a per-worker basis by
/// implementing the corresponding [crate::worker::app_worker::AppWorker] methods.
#[serde(default)]
#[serde(default, flatten)]
pub worker_config: AppWorkerConfig,
}
28 changes: 27 additions & 1 deletion src/worker/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::app::App;
use crate::worker::app_worker::AppWorker;
use crate::worker::RoadsterWorker;
use serde::Serialize;
use sidekiq::Processor;
use sidekiq::{periodic, Processor};
use std::sync::Arc;
use tracing::debug;

Expand Down Expand Up @@ -33,4 +33,30 @@ where
let roadster_worker = RoadsterWorker::new(worker, self.state.clone());
self.processor.register(roadster_worker);
}

/// Register a periodic [worker][AppWorker] that will run with the provided args. The cadence
/// of the periodic worker, the worker's queue name, and other attributes are specified using
/// the [builder][periodic::Builder]. However, to help ensure type-safety the args are provided
/// to this method instead of the [builder][periodic::Builder].
///
/// The worker will be wrapped by a [RoadsterWorker], which provides some common behavior, such
/// as enforcing a timeout/max duration of worker jobs.
pub async fn register_periodic_app_worker<Args, W>(
&mut self,
builder: periodic::Builder,
worker: W,
args: Args,
) -> anyhow::Result<()>
where
Args: Sync + Send + Serialize + for<'de> serde::Deserialize<'de> + 'static,
W: AppWorker<A, Args> + 'static,
{
debug!("Registering periodic worker: `{}`", W::class_name());
let roadster_worker = RoadsterWorker::new(worker, self.state.clone());
builder
.args(args)?
.register(&mut self.processor, roadster_worker)
.await?;
Ok(())
}
}

0 comments on commit bca26d0

Please sign in to comment.