From bf4423bcb19f0097826ec920cd95fb0b125fa5ae Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Tue, 30 Jan 2024 16:58:31 -0800 Subject: [PATCH] feat(consumer): Change `JobRunner` to a `trait` --- README.md | 2 +- src/consumer/mod.rs | 64 ++++++++++++++++++++++++++++++++++------ src/lib.rs | 6 ++-- tests/real/community.rs | 4 +-- tests/real/enterprise.rs | 8 ++--- 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 75ab8eb8..95442306 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ If you want to **accept** jobs from Faktory, use `Consumer`. use faktory::ConsumerBuilder; use std::io; let mut c = ConsumerBuilder::default(); -c.register("foobar", |job| -> io::Result<()> { +c.register("foobar", |job: Job| -> io::Result<()> { println!("{:?}", job); Ok(()) }); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 3908932f..04aa1210 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -12,8 +12,53 @@ const STATUS_RUNNING: usize = 0; const STATUS_QUIET: usize = 1; const STATUS_TERMINATING: usize = 2; -type JobRunner = dyn Fn(Job) -> Result<(), E> + Send + Sync; -type BoxedJobRunner = Box>; +/// Implementors of this trait can be registered to run jobs in a `Consumer`. +/// +/// # Example +/// +/// Create a worker with all default options, register a single handler (for the `foo` job +/// type), connect to the Faktory server, and start accepting jobs. +/// The handler is a struct that implements `JobRunner`. +/// +/// ```no_run +/// use faktory::{ConsumerBuilder, JobRunner, Job}; +/// use std::io; +/// +/// struct MyHandler { +/// config: String, +/// } +/// impl JobRunner for MyHandler { +/// fn run(&self, job: Job) -> Result<(), io::Error> { +/// println!("config: {}", self.config); +/// println!("job: {:?}", job); +/// Ok(()) +/// } +/// } +/// +/// let mut c = ConsumerBuilder::default(); +/// let handler = MyHandler { +/// config: "bar".to_string(), +/// }; +/// c.register("foo", handler); +/// let mut c = c.connect(None).unwrap(); +/// if let Err(e) = c.run(&["default"]) { +/// println!("worker failed: {}", e); +/// } +/// ``` +pub trait JobRunner: Send + Sync { + /// A handler function that runs a job. + fn run(&self, job: Job) -> Result<(), E>; +} +// Implements JobRunner for a closure that takes a Job and returns a Result<(), E> +impl JobRunner for F +where + F: Fn(Job) -> Result<(), E> + Send + Sync, +{ + fn run(&self, job: Job) -> Result<(), E> { + self(job) + } +} +type BoxedJobRunner = Box>; /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -95,10 +140,10 @@ type BoxedJobRunner = Box>; /// type), connect to the Faktory server, and start accepting jobs. /// /// ```no_run -/// use faktory::ConsumerBuilder; +/// use faktory::{ConsumerBuilder, Job}; /// use std::io; /// let mut c = ConsumerBuilder::default(); -/// c.register("foobar", |job| -> io::Result<()> { +/// c.register("foobar", |job: Job| -> io::Result<()> { /// println!("{:?}", job); /// Ok(()) /// }); @@ -185,15 +230,16 @@ impl ConsumerBuilder { self } - /// Register a handler function for the given job type (`kind`). + /// Register a handler type for the given job type (`kind`). /// /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler - /// function is called with that job as its argument. + /// is called with that job as its argument. + /// + /// Often you will want to use a closure as the handler. pub fn register(&mut self, kind: K, handler: H) -> &mut Self where K: Into, - // Annoyingly, can't just use the JobRunner type alias here. - H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, + H: JobRunner + 'static, { self.callbacks.insert(kind.into(), Box::new(handler)); self @@ -265,7 +311,7 @@ where { fn run_job(&mut self, job: Job) -> Result<(), Failed> { match self.callbacks.get(&job.kind) { - Some(callback) => callback(job).map_err(Failed::Application), + Some(callback) => callback.run(job).map_err(Failed::Application), None => { // cannot execute job, since no handler exists Err(Failed::BadJobType(job.kind)) diff --git a/src/lib.rs b/src/lib.rs index 89a88c54..aa4849d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,10 +42,10 @@ //! If you want to **accept** jobs from Faktory, use `Consumer`. //! //! ```no_run -//! use faktory::ConsumerBuilder; +//! use faktory::{ConsumerBuilder, Job}; //! use std::io; //! let mut c = ConsumerBuilder::default(); -//! c.register("foobar", |job| -> io::Result<()> { +//! c.register("foobar", |job: Job| -> io::Result<()> { //! println!("{:?}", job); //! Ok(()) //! }); @@ -73,7 +73,7 @@ mod tls; #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use tls::TlsStream; -pub use crate::consumer::{Consumer, ConsumerBuilder}; +pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner}; pub use crate::error::Error; pub use crate::producer::Producer; pub use crate::proto::Reconnect; diff --git a/tests/real/community.rs b/tests/real/community.rs index fd9642f8..cc26ea22 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -171,11 +171,11 @@ fn test_jobs_created_with_builder() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(None).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("rebuild_index", move |job| -> io::Result<_> { + consumer.register("rebuild_index", move |job: Job| -> io::Result<_> { assert!(job.args().is_empty()); Ok(eprintln!("{:?}", job)) }); - consumer.register("register_order", move |job| -> io::Result<_> { + consumer.register("register_order", move |job: Job| -> io::Result<_> { assert!(job.args().len() != 0); Ok(eprintln!("{:?}", job)) }); diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 67264f9a..35f99760 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -31,7 +31,7 @@ fn ent_expiring_job() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("AnExpiringJob", move |job| -> io::Result<_> { + consumer.register("AnExpiringJob", move |job: Job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -84,7 +84,7 @@ fn ent_unique_job() { // prepare producer and consumer: let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register(job_type, |job| -> io::Result<_> { + consumer.register(job_type, |job: Job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -197,7 +197,7 @@ fn ent_unique_job_until_success() { // to work hard: let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job| -> io::Result<_> { + consumer_a.register(job_type, |job: Job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args @@ -276,7 +276,7 @@ fn ent_unique_job_until_start() { let handle = thread::spawn(move || { let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job| -> io::Result<_> { + consumer_a.register(job_type, |job: Job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args