Skip to content

Commit

Permalink
Refactor Consumer Module: Introduce JobRunner Trait for Flexible Job …
Browse files Browse the repository at this point in the history
…Handling (#51)
  • Loading branch information
Kab1r authored Feb 18, 2024
1 parent cdf15d3 commit 248c34e
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
101 changes: 96 additions & 5 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,88 @@ const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;

type JobRunner<E> = dyn Fn(Job) -> Result<(), E> + Send + Sync;
type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// Implementations of this trait can be registered to run jobs in a `Consumer`.
///
/// # Example
///
/// Create a worker with all default options, register a single handler (for the `foo` job
/// type), connect to the Faktory server, and start accepting jobs.
/// The handler is a struct that implements `JobRunner`.
///
/// ```no_run
/// use faktory::{ConsumerBuilder, JobRunner, Job};
/// use std::io;
///
/// struct MyHandler {
/// config: String,
/// }
/// impl JobRunner for MyHandler {
/// type Error = io::Error;
/// fn run(&self, job: Job) -> Result<(), Self::Error> {
/// println!("config: {}", self.config);
/// println!("job: {:?}", job);
/// Ok(())
/// }
/// }
///
/// let mut c = ConsumerBuilder::default();
/// let handler = MyHandler {
/// config: "bar".to_string(),
/// };
/// c.register_runner("foo", handler);
/// let mut c = c.connect(None).unwrap();
/// if let Err(e) = c.run(&["default"]) {
/// println!("worker failed: {}", e);
/// }
/// ```
pub trait JobRunner: Send + Sync {
/// The error type that the handler may return.
type Error;
/// A handler function that runs a job.
fn run(&self, job: Job) -> Result<(), Self::Error>;
}
type BoxedJobRunner<E> = Box<dyn JobRunner<Error = E>>;
// Implements JobRunner for a closure that takes a Job and returns a Result<(), E>
impl<E, F> JobRunner for Box<F>
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}
}

// Additional Blanket Implementations
impl<'a, E, F> JobRunner for &'a F
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}
}
impl<'a, E, F> JobRunner for &'a mut F
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
(self as &F)(job)
}
}
#[repr(transparent)]
struct Closure<F>(F);
impl<E, F> JobRunner for Closure<F>
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
(self.0)(job)
}
}

/// `Consumer` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -193,10 +273,21 @@ impl<E> ConsumerBuilder<E> {
pub fn register<K, H>(&mut self, kind: K, handler: H) -> &mut Self
where
K: Into<String>,
// Annoyingly, can't just use the JobRunner<E> type alias here.
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks.insert(kind.into(), Box::new(handler));
self.register_runner(kind, Closure(handler))
}

/// Register a handler for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
/// object is called with that job as its argument.
pub fn register_runner<K, H>(&mut self, kind: K, runner: H) -> &mut Self
where
K: Into<String>,
H: JobRunner<Error = E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(runner));
self
}

Expand Down Expand Up @@ -264,7 +355,7 @@ where
{
fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
match self.callbacks.get(&job.kind) {
Some(callback) => callback(job).map_err(Failed::Application),
Some(callback) => callback.run(job).map_err(Failed::Application),
None => {
// cannot execute job, since no handler exists
Err(Failed::BadJobType(job.kind))
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod consumer;
mod producer;
mod proto;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::{Client, Job, JobBuilder, Reconnect};
Expand Down

0 comments on commit 248c34e

Please sign in to comment.