Skip to content

Commit

Permalink
feat(consumer): Change JobRunner to a trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Kab1r committed Jan 31, 2024
1 parent df60d07 commit bf4423b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ If you want to **accept** jobs from Faktory, use `Consumer`.
use faktory::ConsumerBuilder;
use std::io;
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
c.register("foobar", |job: Job| -> io::Result<()> {
println!("{:?}", job);
Ok(())
});
Expand Down
64 changes: 55 additions & 9 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,53 @@ const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;

type JobRunner<E> = dyn Fn(Job) -> Result<(), E> + Send + Sync;
type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// Implementors of this trait can be registered to run jobs in a `Consumer`.
///
/// # Example
///
/// Create a worker with all default options, register a single handler (for the `foo` job
/// type), connect to the Faktory server, and start accepting jobs.
/// The handler is a struct that implements `JobRunner`.
///
/// ```no_run
/// use faktory::{ConsumerBuilder, JobRunner, Job};
/// use std::io;
///
/// struct MyHandler {
/// config: String,
/// }
/// impl JobRunner<io::Error> for MyHandler {
/// fn run(&self, job: Job) -> Result<(), io::Error> {
/// println!("config: {}", self.config);
/// println!("job: {:?}", job);
/// Ok(())
/// }
/// }
///
/// let mut c = ConsumerBuilder::default();
/// let handler = MyHandler {
/// config: "bar".to_string(),
/// };
/// c.register("foo", handler);
/// let mut c = c.connect(None).unwrap();
/// if let Err(e) = c.run(&["default"]) {
/// println!("worker failed: {}", e);
/// }
/// ```
pub trait JobRunner<E>: Send + Sync {
/// A handler function that runs a job.
fn run(&self, job: Job) -> Result<(), E>;
}
// Implements JobRunner for a closure that takes a Job and returns a Result<(), E>
impl<E, F> JobRunner<E> for F
where
F: Fn(Job) -> Result<(), E> + Send + Sync,
{
fn run(&self, job: Job) -> Result<(), E> {
self(job)
}
}
type BoxedJobRunner<E> = Box<dyn JobRunner<E>>;

/// `Consumer` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -95,10 +140,10 @@ type BoxedJobRunner<E> = Box<JobRunner<E>>;
/// type), connect to the Faktory server, and start accepting jobs.
///
/// ```no_run
/// use faktory::ConsumerBuilder;
/// use faktory::{ConsumerBuilder, Job};
/// use std::io;
/// let mut c = ConsumerBuilder::default();
/// c.register("foobar", |job| -> io::Result<()> {
/// c.register("foobar", |job: Job| -> io::Result<()> {
/// println!("{:?}", job);
/// Ok(())
/// });
Expand Down Expand Up @@ -185,15 +230,16 @@ impl<E> ConsumerBuilder<E> {
self
}

/// Register a handler function for the given job type (`kind`).
/// Register a handler type for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
/// function is called with that job as its argument.
/// is called with that job as its argument.
///
/// Often you will want to use a closure as the handler.
pub fn register<K, H>(&mut self, kind: K, handler: H) -> &mut Self
where
K: Into<String>,
// Annoyingly, can't just use the JobRunner<E> type alias here.
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
H: JobRunner<E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(handler));
self
Expand Down Expand Up @@ -265,7 +311,7 @@ where
{
fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
match self.callbacks.get(&job.kind) {
Some(callback) => callback(job).map_err(Failed::Application),
Some(callback) => callback.run(job).map_err(Failed::Application),
None => {
// cannot execute job, since no handler exists
Err(Failed::BadJobType(job.kind))
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
//! If you want to **accept** jobs from Faktory, use `Consumer`.
//!
//! ```no_run
//! use faktory::ConsumerBuilder;
//! use faktory::{ConsumerBuilder, Job};
//! use std::io;
//! let mut c = ConsumerBuilder::default();
//! c.register("foobar", |job| -> io::Result<()> {
//! c.register("foobar", |job: Job| -> io::Result<()> {
//! println!("{:?}", job);
//! Ok(())
//! });
Expand Down Expand Up @@ -73,7 +73,7 @@ mod tls;
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;
Expand Down
4 changes: 2 additions & 2 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ fn test_jobs_created_with_builder() {
// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = Producer::connect(None).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register("rebuild_index", move |job| -> io::Result<_> {
consumer.register("rebuild_index", move |job: Job| -> io::Result<_> {
assert!(job.args().is_empty());
Ok(eprintln!("{:?}", job))
});
consumer.register("register_order", move |job| -> io::Result<_> {
consumer.register("register_order", move |job: Job| -> io::Result<_> {
assert!(job.args().len() != 0);
Ok(eprintln!("{:?}", job))
});
Expand Down
8 changes: 4 additions & 4 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn ent_expiring_job() {
// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = Producer::connect(Some(&url)).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register("AnExpiringJob", move |job| -> io::Result<_> {
consumer.register("AnExpiringJob", move |job: Job| -> io::Result<_> {
Ok(eprintln!("{:?}", job))
});
let mut consumer = consumer.connect(Some(&url)).unwrap();
Expand Down Expand Up @@ -84,7 +84,7 @@ fn ent_unique_job() {
// prepare producer and consumer:
let mut producer = Producer::connect(Some(&url)).unwrap();
let mut consumer = ConsumerBuilder::default();
consumer.register(job_type, |job| -> io::Result<_> {
consumer.register(job_type, |job: Job| -> io::Result<_> {
Ok(eprintln!("{:?}", job))
});
let mut consumer = consumer.connect(Some(&url)).unwrap();
Expand Down Expand Up @@ -197,7 +197,7 @@ fn ent_unique_job_until_success() {
// to work hard:
let mut producer_a = Producer::connect(Some(&url1)).unwrap();
let mut consumer_a = ConsumerBuilder::default();
consumer_a.register(job_type, |job| -> io::Result<_> {
consumer_a.register(job_type, |job: Job| -> io::Result<_> {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
Expand Down Expand Up @@ -276,7 +276,7 @@ fn ent_unique_job_until_start() {
let handle = thread::spawn(move || {
let mut producer_a = Producer::connect(Some(&url1)).unwrap();
let mut consumer_a = ConsumerBuilder::default();
consumer_a.register(job_type, |job| -> io::Result<_> {
consumer_a.register(job_type, |job: Job| -> io::Result<_> {
let args = job.args().to_owned();
let mut args = args.iter();
let diffuculty_level = args
Expand Down

0 comments on commit bf4423b

Please sign in to comment.