Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Consumer Module: Introduce JobRunner Trait for Flexible Job Handling #51

Merged
merged 8 commits into from
Feb 18, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ If you want to **accept** jobs from Faktory, use `Consumer`.
use faktory::ConsumerBuilder;
use std::io;
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
c.register("foobar", |job: Job| -> io::Result<()> {
Kab1r marked this conversation as resolved.
Show resolved Hide resolved
println!("{:?}", job);
Ok(())
});
Expand Down
64 changes: 55 additions & 9 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,53 @@ const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;

type JobRunner<E> = dyn Fn(Job) -> Result<(), E> + Send + Sync;
type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// Implementors of this trait can be registered to run jobs in a `Consumer`.
///
/// # Example
///
/// Create a worker with all default options, register a single handler (for the `foo` job
/// type), connect to the Faktory server, and start accepting jobs.
/// The handler is a struct that implements `JobRunner`.
///
/// ```no_run
/// use faktory::{ConsumerBuilder, JobRunner, Job};
/// use std::io;
///
/// struct MyHandler {
/// config: String,
/// }
/// impl JobRunner<io::Error> for MyHandler {
/// fn run(&self, job: Job) -> Result<(), io::Error> {
/// println!("config: {}", self.config);
/// println!("job: {:?}", job);
/// Ok(())
/// }
/// }
///
/// let mut c = ConsumerBuilder::default();
/// let handler = MyHandler {
/// config: "bar".to_string(),
/// };
/// c.register("foo", handler);
/// let mut c = c.connect(None).unwrap();
/// if let Err(e) = c.run(&["default"]) {
/// println!("worker failed: {}", e);
/// }
/// ```
pub trait JobRunner<E>: Send + Sync {
Kab1r marked this conversation as resolved.
Show resolved Hide resolved
/// A handler function that runs a job.
fn run(&self, job: Job) -> Result<(), E>;
}
// Implements JobRunner for a closure that takes a Job and returns a Result<(), E>
impl<E, F> JobRunner<E> for F
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}
}
Kab1r marked this conversation as resolved.
Show resolved Hide resolved
type BoxedJobRunner<E> = Box<dyn JobRunner<E>>;

/// `Consumer` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -95,10 +140,10 @@ type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// type), connect to the Faktory server, and start accepting jobs.
///
/// ```no_run
/// use faktory::ConsumerBuilder;
/// use faktory::{ConsumerBuilder, Job};
/// use std::io;
/// let mut c = ConsumerBuilder::default();
/// c.register("foobar", |job| -> io::Result<()> {
/// c.register("foobar", |job: Job| -> io::Result<()> {
/// println!("{:?}", job);
/// Ok(())
/// });
Expand Down Expand Up @@ -185,15 +230,16 @@ impl<E> ConsumerBuilder<E> {
self
}

/// Register a handler function for the given job type (`kind`).
/// Register a handler type for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
/// function is called with that job as its argument.
/// is called with that job as its argument.
///
/// Often you will want to use a closure as the handler.
pub fn register<K, H>(&mut self, kind: K, handler: H) -> &mut Self
where
K: Into<String>,
// Annoyingly, can't just use the JobRunner<E> type alias here.
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
H: JobRunner<E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(handler));
self
Expand Down Expand Up @@ -265,7 +311,7 @@ where
{
fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
match self.callbacks.get(&job.kind) {
Some(callback) => callback(job).map_err(Failed::Application),
Some(callback) => callback.run(job).map_err(Failed::Application),
None => {
// cannot execute job, since no handler exists
Err(Failed::BadJobType(job.kind))
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
//! If you want to **accept** jobs from Faktory, use `Consumer`.
//!
//! ```no_run
//! use faktory::ConsumerBuilder;
//! use faktory::{ConsumerBuilder, Job};
//! use std::io;
//! let mut c = ConsumerBuilder::default();
//! c.register("foobar", |job| -> io::Result<()> {
//! c.register("foobar", |job: Job| -> io::Result<()> {
//! println!("{:?}", job);
//! Ok(())
//! });
Expand Down Expand Up @@ -73,7 +73,7 @@ mod tls;
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;
Expand Down
4 changes: 2 additions & 2 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ fn test_jobs_created_with_builder() {
// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = Producer::connect(None).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register("rebuild_index", move |job| -> io::Result<_> {
consumer.register("rebuild_index", move |job: Job| -> io::Result<_> {
assert!(job.args().is_empty());
Ok(eprintln!("{:?}", job))
});
consumer.register("register_order", move |job| -> io::Result<_> {
consumer.register("register_order", move |job: Job| -> io::Result<_> {
assert!(job.args().len() != 0);
Ok(eprintln!("{:?}", job))
});
Expand Down
8 changes: 4 additions & 4 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn ent_expiring_job() {
// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = Producer::connect(Some(&url)).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register("AnExpiringJob", move |job| -> io::Result<_> {
consumer.register("AnExpiringJob", move |job: Job| -> io::Result<_> {
Ok(eprintln!("{:?}", job))
});
let mut consumer = consumer.connect(Some(&url)).unwrap();
Expand Down Expand Up @@ -84,7 +84,7 @@ fn ent_unique_job() {
// prepare producer and consumer:
let mut producer = Producer::connect(Some(&url)).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register(job_type, |job| -> io::Result<_> {
consumer.register(job_type, |job: Job| -> io::Result<_> {
Ok(eprintln!("{:?}", job))
});
let mut consumer = consumer.connect(Some(&url)).unwrap();
Expand Down Expand Up @@ -197,7 +197,7 @@ fn ent_unique_job_until_success() {
// to work hard:
let mut producer_a = Producer::connect(Some(&url1)).unwrap();
let mut consumer_a = ConsumerBuilder::default();
consumer_a.register(job_type, |job| -> io::Result<_> {
consumer_a.register(job_type, |job: Job| -> io::Result<_> {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
Expand Down Expand Up @@ -276,7 +276,7 @@ fn ent_unique_job_until_start() {
let handle = thread::spawn(move || {
let mut producer_a = Producer::connect(Some(&url1)).unwrap();
let mut consumer_a = ConsumerBuilder::default();
consumer_a.register(job_type, |job| -> io::Result<_> {
consumer_a.register(job_type, |job: Job| -> io::Result<_> {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
Expand Down
Loading