From bf4423bcb19f0097826ec920cd95fb0b125fa5ae Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Tue, 30 Jan 2024 16:58:31 -0800 Subject: [PATCH 1/7] feat(consumer): Change `JobRunner` to a `trait` --- README.md | 2 +- src/consumer/mod.rs | 64 ++++++++++++++++++++++++++++++++++------ src/lib.rs | 6 ++-- tests/real/community.rs | 4 +-- tests/real/enterprise.rs | 8 ++--- 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 75ab8eb8..95442306 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ If you want to **accept** jobs from Faktory, use `Consumer`. use faktory::ConsumerBuilder; use std::io; let mut c = ConsumerBuilder::default(); -c.register("foobar", |job| -> io::Result<()> { +c.register("foobar", |job: Job| -> io::Result<()> { println!("{:?}", job); Ok(()) }); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 3908932f..04aa1210 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -12,8 +12,53 @@ const STATUS_RUNNING: usize = 0; const STATUS_QUIET: usize = 1; const STATUS_TERMINATING: usize = 2; -type JobRunner = dyn Fn(Job) -> Result<(), E> + Send + Sync; -type BoxedJobRunner = Box>; +/// Implementors of this trait can be registered to run jobs in a `Consumer`. +/// +/// # Example +/// +/// Create a worker with all default options, register a single handler (for the `foo` job +/// type), connect to the Faktory server, and start accepting jobs. +/// The handler is a struct that implements `JobRunner`. +/// +/// ```no_run +/// use faktory::{ConsumerBuilder, JobRunner, Job}; +/// use std::io; +/// +/// struct MyHandler { +/// config: String, +/// } +/// impl JobRunner for MyHandler { +/// fn run(&self, job: Job) -> Result<(), io::Error> { +/// println!("config: {}", self.config); +/// println!("job: {:?}", job); +/// Ok(()) +/// } +/// } +/// +/// let mut c = ConsumerBuilder::default(); +/// let handler = MyHandler { +/// config: "bar".to_string(), +/// }; +/// c.register("foo", handler); +/// let mut c = c.connect(None).unwrap(); +/// if let Err(e) = c.run(&["default"]) { +/// println!("worker failed: {}", e); +/// } +/// ``` +pub trait JobRunner: Send + Sync { + /// A handler function that runs a job. + fn run(&self, job: Job) -> Result<(), E>; +} +// Implements JobRunner for a closure that takes a Job and returns a Result<(), E> +impl JobRunner for F +where + F: Fn(Job) -> Result<(), E> + Send + Sync, +{ + fn run(&self, job: Job) -> Result<(), E> { + self(job) + } +} +type BoxedJobRunner = Box>; /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -95,10 +140,10 @@ type BoxedJobRunner = Box>; /// type), connect to the Faktory server, and start accepting jobs. /// /// ```no_run -/// use faktory::ConsumerBuilder; +/// use faktory::{ConsumerBuilder, Job}; /// use std::io; /// let mut c = ConsumerBuilder::default(); -/// c.register("foobar", |job| -> io::Result<()> { +/// c.register("foobar", |job: Job| -> io::Result<()> { /// println!("{:?}", job); /// Ok(()) /// }); @@ -185,15 +230,16 @@ impl ConsumerBuilder { self } - /// Register a handler function for the given job type (`kind`). + /// Register a handler type for the given job type (`kind`). /// /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler - /// function is called with that job as its argument. + /// is called with that job as its argument. + /// + /// Often you will want to use a closure as the handler. pub fn register(&mut self, kind: K, handler: H) -> &mut Self where K: Into, - // Annoyingly, can't just use the JobRunner type alias here. - H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, + H: JobRunner + 'static, { self.callbacks.insert(kind.into(), Box::new(handler)); self @@ -265,7 +311,7 @@ where { fn run_job(&mut self, job: Job) -> Result<(), Failed> { match self.callbacks.get(&job.kind) { - Some(callback) => callback(job).map_err(Failed::Application), + Some(callback) => callback.run(job).map_err(Failed::Application), None => { // cannot execute job, since no handler exists Err(Failed::BadJobType(job.kind)) diff --git a/src/lib.rs b/src/lib.rs index 89a88c54..aa4849d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,10 +42,10 @@ //! If you want to **accept** jobs from Faktory, use `Consumer`. //! //! ```no_run -//! use faktory::ConsumerBuilder; +//! use faktory::{ConsumerBuilder, Job}; //! use std::io; //! let mut c = ConsumerBuilder::default(); -//! c.register("foobar", |job| -> io::Result<()> { +//! c.register("foobar", |job: Job| -> io::Result<()> { //! println!("{:?}", job); //! Ok(()) //! }); @@ -73,7 +73,7 @@ mod tls; #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use tls::TlsStream; -pub use crate::consumer::{Consumer, ConsumerBuilder}; +pub use crate::consumer::{Consumer, ConsumerBuilder, JobRunner}; pub use crate::error::Error; pub use crate::producer::Producer; pub use crate::proto::Reconnect; diff --git a/tests/real/community.rs b/tests/real/community.rs index fd9642f8..cc26ea22 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -171,11 +171,11 @@ fn test_jobs_created_with_builder() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(None).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("rebuild_index", move |job| -> io::Result<_> { + consumer.register("rebuild_index", move |job: Job| -> io::Result<_> { assert!(job.args().is_empty()); Ok(eprintln!("{:?}", job)) }); - consumer.register("register_order", move |job| -> io::Result<_> { + consumer.register("register_order", move |job: Job| -> io::Result<_> { assert!(job.args().len() != 0); Ok(eprintln!("{:?}", job)) }); diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 67264f9a..35f99760 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -31,7 +31,7 @@ fn ent_expiring_job() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("AnExpiringJob", move |job| -> io::Result<_> { + consumer.register("AnExpiringJob", move |job: Job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -84,7 +84,7 @@ fn ent_unique_job() { // prepare producer and consumer: let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register(job_type, |job| -> io::Result<_> { + consumer.register(job_type, |job: Job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -197,7 +197,7 @@ fn ent_unique_job_until_success() { // to work hard: let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job| -> io::Result<_> { + consumer_a.register(job_type, |job: Job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args @@ -276,7 +276,7 @@ fn ent_unique_job_until_start() { let handle = thread::spawn(move || { let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job| -> io::Result<_> { + consumer_a.register(job_type, |job: Job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args From e40d7f07b70a51d4c6ecfa1f8b79fa03ff427204 Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sun, 4 Feb 2024 16:21:23 -0800 Subject: [PATCH 2/7] fix: apply requested changes - removed breaking change: introduced `register_runner` - replaced `E` generic type parameter with asssociated type in `JobRunner` trait --- README.md | 2 +- src/consumer/mod.rs | 39 +++++++++++++++++++++++++++------------ src/lib.rs | 4 ++-- tests/real/community.rs | 4 ++-- tests/real/enterprise.rs | 8 ++++---- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 95442306..75ab8eb8 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ If you want to **accept** jobs from Faktory, use `Consumer`. use faktory::ConsumerBuilder; use std::io; let mut c = ConsumerBuilder::default(); -c.register("foobar", |job: Job| -> io::Result<()> { +c.register("foobar", |job| -> io::Result<()> { println!("{:?}", job); Ok(()) }); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 04aa1210..1d54fdf1 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -39,26 +39,29 @@ const STATUS_TERMINATING: usize = 2; /// let handler = MyHandler { /// config: "bar".to_string(), /// }; -/// c.register("foo", handler); +/// c.register_runner("foo", Box::new(handler)); /// let mut c = c.connect(None).unwrap(); /// if let Err(e) = c.run(&["default"]) { /// println!("worker failed: {}", e); /// } /// ``` -pub trait JobRunner: Send + Sync { +pub trait JobRunner: Send + Sync { + /// The error type that the handler may return. + type E; /// A handler function that runs a job. - fn run(&self, job: Job) -> Result<(), E>; + fn run(&self, job: Job) -> Result<(), Self::E>; } // Implements JobRunner for a closure that takes a Job and returns a Result<(), E> -impl JobRunner for F +impl JobRunner for F where F: Fn(Job) -> Result<(), E> + Send + Sync, { + type E = E; fn run(&self, job: Job) -> Result<(), E> { self(job) } } -type BoxedJobRunner = Box>; +type BoxedJobRunner = Box>; /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -140,10 +143,10 @@ type BoxedJobRunner = Box>; /// type), connect to the Faktory server, and start accepting jobs. /// /// ```no_run -/// use faktory::{ConsumerBuilder, Job}; +/// use faktory::ConsumerBuilder; /// use std::io; /// let mut c = ConsumerBuilder::default(); -/// c.register("foobar", |job: Job| -> io::Result<()> { +/// c.register("foobar", |job| -> io::Result<()> { /// println!("{:?}", job); /// Ok(()) /// }); @@ -230,21 +233,33 @@ impl ConsumerBuilder { self } - /// Register a handler type for the given job type (`kind`). + /// Register a handler function for the given job type (`kind`). /// /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler - /// is called with that job as its argument. - /// - /// Often you will want to use a closure as the handler. + /// function is called with that job as its argument. pub fn register(&mut self, kind: K, handler: H) -> &mut Self where K: Into, - H: JobRunner + 'static, + // Annoyingly, can't just use the JobRunner type alias here. + H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { self.callbacks.insert(kind.into(), Box::new(handler)); self } + /// Register a handler for the given job type (`kind`). + /// + /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler + /// object is called with that job as its argument. + pub fn register_runner(&mut self, kind: K, runner: H) -> &mut Self + where + K: Into, + H: JobRunner + 'static, + { + self.callbacks.insert(kind.into(), Box::new(runner)); + self + } + /// Connect to a Faktory server. /// /// If `url` is not given, will use the standard Faktory environment variables. Specifically, diff --git a/src/lib.rs b/src/lib.rs index aa4849d8..ed1d9be3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,10 +42,10 @@ //! If you want to **accept** jobs from Faktory, use `Consumer`. //! //! ```no_run -//! use faktory::{ConsumerBuilder, Job}; +//! use faktory::ConsumerBuilder; //! use std::io; //! let mut c = ConsumerBuilder::default(); -//! c.register("foobar", |job: Job| -> io::Result<()> { +//! c.register("foobar", |job| -> io::Result<()> { //! println!("{:?}", job); //! Ok(()) //! }); diff --git a/tests/real/community.rs b/tests/real/community.rs index cc26ea22..fd9642f8 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -171,11 +171,11 @@ fn test_jobs_created_with_builder() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(None).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("rebuild_index", move |job: Job| -> io::Result<_> { + consumer.register("rebuild_index", move |job| -> io::Result<_> { assert!(job.args().is_empty()); Ok(eprintln!("{:?}", job)) }); - consumer.register("register_order", move |job: Job| -> io::Result<_> { + consumer.register("register_order", move |job| -> io::Result<_> { assert!(job.args().len() != 0); Ok(eprintln!("{:?}", job)) }); diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 35f99760..67264f9a 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -31,7 +31,7 @@ fn ent_expiring_job() { // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register("AnExpiringJob", move |job: Job| -> io::Result<_> { + consumer.register("AnExpiringJob", move |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -84,7 +84,7 @@ fn ent_unique_job() { // prepare producer and consumer: let mut producer = Producer::connect(Some(&url)).unwrap(); let mut consumer = ConsumerBuilder::default(); - consumer.register(job_type, |job: Job| -> io::Result<_> { + consumer.register(job_type, |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); let mut consumer = consumer.connect(Some(&url)).unwrap(); @@ -197,7 +197,7 @@ fn ent_unique_job_until_success() { // to work hard: let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job: Job| -> io::Result<_> { + consumer_a.register(job_type, |job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args @@ -276,7 +276,7 @@ fn ent_unique_job_until_start() { let handle = thread::spawn(move || { let mut producer_a = Producer::connect(Some(&url1)).unwrap(); let mut consumer_a = ConsumerBuilder::default(); - consumer_a.register(job_type, |job: Job| -> io::Result<_> { + consumer_a.register(job_type, |job| -> io::Result<_> { let args = job.args().to_owned(); let mut args = args.iter(); let diffuculty_level = args From d77e4a0ccca997d568241967b902c03a63e421ba Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sun, 11 Feb 2024 15:05:06 -0800 Subject: [PATCH 3/7] fix(JobRunner): rename associated type --- src/consumer/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 1d54fdf1..5a49aa4b 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -47,21 +47,21 @@ const STATUS_TERMINATING: usize = 2; /// ``` pub trait JobRunner: Send + Sync { /// The error type that the handler may return. - type E; + type Error; /// A handler function that runs a job. - fn run(&self, job: Job) -> Result<(), Self::E>; + fn run(&self, job: Job) -> Result<(), Self::Error>; } // Implements JobRunner for a closure that takes a Job and returns a Result<(), E> impl JobRunner for F where F: Fn(Job) -> Result<(), E> + Send + Sync, { - type E = E; + type Error = E; fn run(&self, job: Job) -> Result<(), E> { self(job) } } -type BoxedJobRunner = Box>; +type BoxedJobRunner = Box>; /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -254,7 +254,7 @@ impl ConsumerBuilder { pub fn register_runner(&mut self, kind: K, runner: H) -> &mut Self where K: Into, - H: JobRunner + 'static, + H: JobRunner + 'static, { self.callbacks.insert(kind.into(), Box::new(runner)); self From 82c2be30408c0541464a853a1c3dbd94dd173278 Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sun, 11 Feb 2024 15:06:37 -0800 Subject: [PATCH 4/7] refactor(register): simply call `register_runner` --- src/consumer/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 5a49aa4b..4dbf5602 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -240,11 +240,9 @@ impl ConsumerBuilder { pub fn register(&mut self, kind: K, handler: H) -> &mut Self where K: Into, - // Annoyingly, can't just use the JobRunner type alias here. H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { - self.callbacks.insert(kind.into(), Box::new(handler)); - self + self.register_runner(kind, handler) } /// Register a handler for the given job type (`kind`). From 0a4b08dc080d9d5c839e28db5b372353a551988d Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sun, 11 Feb 2024 23:14:40 -0800 Subject: [PATCH 5/7] docs: update example with associated type --- src/consumer/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 4dbf5602..dba83785 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -27,8 +27,9 @@ const STATUS_TERMINATING: usize = 2; /// struct MyHandler { /// config: String, /// } -/// impl JobRunner for MyHandler { -/// fn run(&self, job: Job) -> Result<(), io::Error> { +/// impl JobRunner for MyHandler { +/// type Error = io::Error; +/// fn run(&self, job: Job) -> Result<(), Self::Error> { /// println!("config: {}", self.config); /// println!("job: {:?}", job); /// Ok(()) @@ -39,7 +40,7 @@ const STATUS_TERMINATING: usize = 2; /// let handler = MyHandler { /// config: "bar".to_string(), /// }; -/// c.register_runner("foo", Box::new(handler)); +/// c.register_runner("foo", handler); /// let mut c = c.connect(None).unwrap(); /// if let Err(e) = c.run(&["default"]) { /// println!("worker failed: {}", e); From 4b6d9b0dd8cf36f732df478a2857dcfc11a51058 Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sun, 11 Feb 2024 23:34:24 -0800 Subject: [PATCH 6/7] fix(JobRunner): add additional blanket implementations --- src/consumer/mod.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index dba83785..698e89ce 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -52,8 +52,9 @@ pub trait JobRunner: Send + Sync { /// A handler function that runs a job. fn run(&self, job: Job) -> Result<(), Self::Error>; } +type BoxedJobRunner = Box>; // Implements JobRunner for a closure that takes a Job and returns a Result<(), E> -impl JobRunner for F +impl JobRunner for Box where F: Fn(Job) -> Result<(), E> + Send + Sync, { @@ -62,7 +63,26 @@ where self(job) } } -type BoxedJobRunner = Box>; + +// Additional Blanket Implementation +impl<'a, E, F> JobRunner for &'a F +where + F: Fn(Job) -> Result<(), E> + Send + Sync, +{ + type Error = E; + fn run(&self, job: Job) -> Result<(), E> { + self(job) + } +} +impl<'a, E, F> JobRunner for &'a mut F +where + F: Fn(Job) -> Result<(), E> + Send + Sync, +{ + type Error = E; + fn run(&self, job: Job) -> Result<(), E> { + (self as &F)(job) + } +} /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -243,7 +263,7 @@ impl ConsumerBuilder { K: Into, H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { - self.register_runner(kind, handler) + self.register_runner(kind, Box::new(handler)) } /// Register a handler for the given job type (`kind`). From b973e41d37b9dcbe6a22261c4868a82e582ea922 Mon Sep 17 00:00:00 2001 From: Kabir Kwatra Date: Sat, 17 Feb 2024 10:23:52 -0800 Subject: [PATCH 7/7] fix(JobRunner): remove extra box for closures --- src/consumer/mod.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 698e89ce..a8875a31 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -12,7 +12,7 @@ const STATUS_RUNNING: usize = 0; const STATUS_QUIET: usize = 1; const STATUS_TERMINATING: usize = 2; -/// Implementors of this trait can be registered to run jobs in a `Consumer`. +/// Implementations of this trait can be registered to run jobs in a `Consumer`. /// /// # Example /// @@ -64,7 +64,7 @@ where } } -// Additional Blanket Implementation +// Additional Blanket Implementations impl<'a, E, F> JobRunner for &'a F where F: Fn(Job) -> Result<(), E> + Send + Sync, @@ -83,6 +83,17 @@ where (self as &F)(job) } } +#[repr(transparent)] +struct Closure(F); +impl JobRunner for Closure +where + F: Fn(Job) -> Result<(), E> + Send + Sync, +{ + type Error = E; + fn run(&self, job: Job) -> Result<(), E> { + (self.0)(job) + } +} /// `Consumer` is used to run a worker that processes jobs provided by Faktory. /// @@ -263,7 +274,7 @@ impl ConsumerBuilder { K: Into, H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { - self.register_runner(kind, Box::new(handler)) + self.register_runner(kind, Closure(handler)) } /// Register a handler for the given job type (`kind`).