Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Consumer Module: Introduce JobRunner Trait for Flexible Job Handling #51

Merged
merged 8 commits into from
Feb 18, 2024
90 changes: 85 additions & 5 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,77 @@
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;

type JobRunner<E> = dyn Fn(Job) -> Result<(), E> + Send + Sync;
type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// Implementors of this trait can be registered to run jobs in a `Consumer`.
///
/// # Example
///
/// Create a worker with all default options, register a single handler (for the `foo` job
/// type), connect to the Faktory server, and start accepting jobs.
/// The handler is a struct that implements `JobRunner`.
///
/// ```no_run
/// use faktory::{ConsumerBuilder, JobRunner, Job};
/// use std::io;
///
/// struct MyHandler {
/// config: String,
/// }
/// impl JobRunner for MyHandler {
/// type Error = io::Error;
/// fn run(&self, job: Job) -> Result<(), Self::Error> {
/// println!("config: {}", self.config);
/// println!("job: {:?}", job);
/// Ok(())
/// }
/// }
///
/// let mut c = ConsumerBuilder::default();
/// let handler = MyHandler {
/// config: "bar".to_string(),
/// };
/// c.register_runner("foo", handler);
/// let mut c = c.connect(None).unwrap();
/// if let Err(e) = c.run(&["default"]) {
/// println!("worker failed: {}", e);
/// }
/// ```
pub trait JobRunner: Send + Sync {
/// The error type that the handler may return.
type Error;
/// A handler function that runs a job.
fn run(&self, job: Job) -> Result<(), Self::Error>;
}
type BoxedJobRunner<E> = Box<dyn JobRunner<Error = E>>;
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
// Implements JobRunner for a closure that takes a Job and returns a Result<(), E>
impl<E, F> JobRunner for Box<F>
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}
}

// Additional Blanket Implementation
impl<'a, E, F> JobRunner for &'a F
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}

Check warning on line 75 in src/consumer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer/mod.rs#L73-L75

Added lines #L73 - L75 were not covered by tests
}
impl<'a, E, F> JobRunner for &'a mut F
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
type Error = E;
fn run(&self, job: Job) -> Result<(), E> {
(self as &F)(job)
}

Check warning on line 84 in src/consumer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer/mod.rs#L82-L84

Added lines #L82 - L84 were not covered by tests
}

/// `Consumer` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -192,10 +261,21 @@
pub fn register<K, H>(&mut self, kind: K, handler: H) -> &mut Self
where
K: Into<String>,
// Annoyingly, can't just use the JobRunner<E> type alias here.
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks.insert(kind.into(), Box::new(handler));
self.register_runner(kind, Box::new(handler))
Kab1r marked this conversation as resolved.
Show resolved Hide resolved
}

/// Register a handler for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
/// object is called with that job as its argument.
pub fn register_runner<K, H>(&mut self, kind: K, runner: H) -> &mut Self
Kab1r marked this conversation as resolved.
Show resolved Hide resolved
where
K: Into<String>,
H: JobRunner<Error = E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(runner));
self
}

Expand Down Expand Up @@ -265,7 +345,7 @@
{
fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
match self.callbacks.get(&job.kind) {
Some(callback) => callback(job).map_err(Failed::Application),
Some(callback) => callback.run(job).map_err(Failed::Application),
None => {
// cannot execute job, since no handler exists
Err(Failed::BadJobType(job.kind))
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ mod tls;
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;
Expand Down
Loading