From 129fa3e0483fec337c84c28356ca6081b2f24315 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 17 Jan 2024 22:40:14 +0500 Subject: [PATCH] Add tracker, add support for batch jobs --- Cargo.lock | 18 +- src/consumer/mod.rs | 11 +- src/lib.rs | 10 + src/producer/mod.rs | 29 +- src/proto/batch/cmd.rs | 65 ++++ src/proto/batch/mod.rs | 336 ++++++++++++++++++ src/proto/mod.rs | 83 ++++- src/proto/single/ent.rs | 148 +++++++- src/proto/single/mod.rs | 23 +- src/proto/single/resp.rs | 25 ++ src/proto/single/utils.rs | 32 +- src/tracker/mod.rs | 122 +++++++ tests/real/enterprise.rs | 696 ++++++++++++++++++++++++++++++++++++-- 13 files changed, 1514 insertions(+), 84 deletions(-) create mode 100644 src/proto/batch/cmd.rs create mode 100644 src/proto/batch/mod.rs create mode 100644 src/tracker/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6190f113..5af0f678 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,7 +523,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -564,9 +564,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2de98502f212cfcea8d0bb305bd0f49d7ebdd75b64ba0a68f937d888f4e0d6db" +checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" dependencies = [ "unicode-ident", ] @@ -687,7 +687,7 @@ checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -731,9 +731,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.46" +version = "2.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89456b690ff72fddcecf231caedbe615c59480c93358a93dfae7fc29e3ebbf0e" +checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" dependencies = [ "proc-macro2", "quote", @@ -770,7 +770,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -871,7 +871,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", "wasm-bindgen-shared", ] @@ -893,7 +893,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 3908932f..70102863 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,5 +1,9 @@ use crate::error::Error; -use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect}; + +use crate::proto::{ + self, parse_provided_or_from_env, Client, ClientOptions, HeartbeatStatus, Reconnect, +}; + use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; @@ -213,10 +217,7 @@ impl ConsumerBuilder { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(self, url: Option<&str>) -> Result, Error> { - let url = match url { - Some(url) => proto::url_parse(url), - None => proto::url_parse(&proto::get_env_url()), - }?; + let url = parse_provided_or_from_env(url)?; let stream = TcpStream::connect(proto::host_from_url(&url))?; Self::connect_with(self, stream, url.password().map(|p| p.to_string())) } diff --git a/src/lib.rs b/src/lib.rs index 5a83286e..df80246c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,4 +74,14 @@ pub use crate::consumer::{Consumer, ConsumerBuilder}; pub use crate::error::Error; pub use crate::producer::Producer; pub use crate::proto::Reconnect; + pub use crate::proto::{Job, JobBuilder}; + +pub use crate::proto::{Batch, BatchBuilder, BatchStatus}; + +#[cfg(feature = "ent")] +mod tracker; +#[cfg(feature = "ent")] +pub use crate::proto::{Progress, ProgressUpdate, ProgressUpdateBuilder}; +#[cfg(feature = "ent")] +pub use crate::tracker::Tracker; diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 25b413bc..b8eefae3 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -1,5 +1,11 @@ use crate::error::Error; -use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl}; +use crate::proto::{ + self, parse_provided_or_from_env, BatchHandle, Client, CommitBatch, Info, Job, OpenBatch, Push, + QueueAction, QueueControl, +}; + +use crate::Batch; + use std::io::prelude::*; use std::net::TcpStream; @@ -82,10 +88,7 @@ impl Producer { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(url: Option<&str>) -> Result { - let url = match url { - Some(url) => proto::url_parse(url), - None => proto::url_parse(&proto::get_env_url()), - }?; + let url = parse_provided_or_from_env(url)?; let stream = TcpStream::connect(proto::host_from_url(&url))?; Self::connect_with(stream, url.password().map(|p| p.to_string())) } @@ -129,6 +132,22 @@ impl Producer { .issue(&QueueControl::new(QueueAction::Resume, queues))? .await_ok() } + + /// Initiate a new batch of jobs. + pub fn start_batch(&mut self, batch: Batch) -> Result, Error> { + let bid = self.c.issue(&batch)?.read_bid()?; + Ok(BatchHandle::new(bid, self)) + } + + /// Open an already existing batch of jobs. + pub fn open_batch(&mut self, bid: String) -> Result, Error> { + let bid = self.c.issue(&OpenBatch::from(bid))?.read_bid()?; + Ok(BatchHandle::new(bid, self)) + } + + pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> { + self.c.issue(&CommitBatch::from(bid))?.await_ok() + } } #[cfg(test)] diff --git a/src/proto/batch/cmd.rs b/src/proto/batch/cmd.rs new file mode 100644 index 00000000..03a2d0e7 --- /dev/null +++ b/src/proto/batch/cmd.rs @@ -0,0 +1,65 @@ +use crate::proto::single::FaktoryCommand; +use crate::{Batch, Error}; +use std::io::Write; + +impl FaktoryCommand for Batch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH NEW ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct CommitBatch(String); + +impl From for CommitBatch { + fn from(value: String) -> Self { + CommitBatch(value) + } +} + +impl FaktoryCommand for CommitBatch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH COMMIT ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct GetBatchStatus(String); + +impl From for GetBatchStatus { + fn from(value: String) -> Self { + GetBatchStatus(value) + } +} + +impl FaktoryCommand for GetBatchStatus { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH STATUS ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct OpenBatch(String); + +impl From for OpenBatch { + fn from(value: String) -> Self { + OpenBatch(value) + } +} + +impl FaktoryCommand for OpenBatch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH OPEN ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} diff --git a/src/proto/batch/mod.rs b/src/proto/batch/mod.rs new file mode 100644 index 00000000..17fedc25 --- /dev/null +++ b/src/proto/batch/mod.rs @@ -0,0 +1,336 @@ +use std::io::{Read, Write}; + +use chrono::{DateTime, Utc}; +use derive_builder::Builder; + +use crate::{Error, Job, Producer}; + +mod cmd; + +pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; + +/// Batch of jobs. +/// +/// Faktory guarantees a callback (`success` and/or `failure`) will be triggered after the execution +/// of all the jobs belonging to the same batch has finished (successfully or with errors accordingly). +/// Batches can be nested. They can also be re-opened, but - once a batch is committed - only those jobs +/// that belong to this batch can re-open it. +/// +/// An empty batch can be committed just fine. That will make Faktory immediately fire a callback (i.e. put +/// the job specified in `complete` and/or the one specified in `success` onto the queues). +/// +/// If you open a batch, but - for some reason - do not commit it within _30 minutes_, it will simply expire +/// on the Faktory server (which means no callbackes will be fired). +/// +/// Here is how you can create a simple batch: +/// ```no_run +/// # use faktory::Error; +/// use faktory::{Producer, Job, Batch}; +/// +/// let mut prod = Producer::connect(None)?; +/// let job1 = Job::builder("image").build(); +/// let job2 = Job::builder("image").build(); +/// let job_cb = Job::builder("clean_up").build(); +/// +/// let batch = Batch::builder("Image resizing workload".to_string()).with_complete_callback(job_cb); +/// +/// let mut batch = prod.start_batch(batch)?; +/// batch.add(job1)?; +/// batch.add(job2)?; +/// batch.commit()?; +/// +/// # Ok::<(), Error>(()) +/// ``` +/// +/// Nested batches are also supported: +/// ```no_run +/// # use faktory::{Producer, Job, Batch, Error}; +/// # let mut prod = Producer::connect(None)?; +/// let parent_job1 = Job::builder("stats_build").build(); +/// let parent_job2 = Job::builder("entries_update").build(); +/// let parent_cb = Job::builder("clean_up").build(); +/// +/// let child_job1 = Job::builder("image_recognition").build(); +/// let child_job2 = Job::builder("image_recognition").build(); +/// let child_cb = Job::builder("clean_up").build(); +/// +/// let parent_batch = Batch::builder("Image recognition and analysis workload".to_string()).with_complete_callback(parent_cb); +/// let child_batch = Batch::builder("Image recognition workload".to_string()).with_success_callback(child_cb); +/// +/// let mut parent = prod.start_batch(parent_batch)?; +/// parent.add(parent_job1)?; +/// parent.add(parent_job2)?; +/// let mut child = parent.start_batch(child_batch)?; +/// child.add(child_job1)?; +/// child.add(child_job2)?; +/// +/// child.commit()?; +/// parent.commit()?; +/// +/// # Ok::<(), Error>(()) +/// ``` +/// +/// In the example above, there is a single level nesting, but you can nest those batches as deep as you wish, +/// effectively building a pipeline this way, since the Faktory guarantees that callback jobs will not be queued unless +/// the batch gets committed. +/// +/// You can retieve the batch status using a [`tracker`](struct.Tracker.html): +/// ```no_run +/// # use faktory::Error; +/// # use faktory::{Producer, Job, Batch, Tracker}; +/// let mut prod = Producer::connect(None)?; +/// let job = Job::builder("image").build(); +/// let cb_job = Job::builder("clean_up").build(); +/// let b = Batch::builder("Description...".to_string()).with_complete_callback(cb_job); +/// +/// let mut b = prod.start_batch(b)?; +/// let bid = b.id().to_string(); +/// b.add(job)?; +/// b.commit()?; +/// +/// let mut t = Tracker::connect(None).unwrap(); +/// let s = t.get_batch_status(bid).unwrap().unwrap(); +/// assert_eq!(s.total, 1); +/// assert_eq!(s.pending, 1); +/// assert_eq!(s.description, Some("Description...".into())); +/// assert_eq!(s.complete_callback_state, ""); // has not been queued; +/// # Ok::<(), Error>(()) +/// ``` +#[derive(Serialize, Debug, Builder)] +#[builder( + custom_constructor, + setter(into), + build_fn(name = "try_build", private) +)] +pub struct Batch { + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(skip))] + parent_bid: Option, + + /// Batch description for Faktory WEB UI. + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + + /// On success callback. + /// + /// This job will be queued by the Faktory server provided + /// all the jobs belonging to this batch have been executed successfully. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(custom))] + pub(crate) success: Option, + + /// On complete callback. + /// + /// This job will be queue by the Faktory server after all the jobs + /// belonging to this batch have been executed, even if one/some/all + /// of the workers have failed. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(custom))] + pub(crate) complete: Option, +} + +impl Batch { + /// Create a new `BatchBuilder`. + pub fn builder(description: impl Into>) -> BatchBuilder { + BatchBuilder::new(description) + } +} + +impl BatchBuilder { + fn build(&mut self) -> Batch { + self.try_build() + .expect("All required fields have been set.") + } + + /// Create a new `BatchBuilder` with optional description of the batch. + pub fn new(description: impl Into>) -> BatchBuilder { + BatchBuilder { + description: Some(description.into()), + ..Self::create_empty() + } + } + + fn success(&mut self, success_cb: impl Into>) -> &mut Self { + self.success = Some(success_cb.into()); + self + } + + fn complete(&mut self, complete_cb: impl Into>) -> &mut Self { + self.complete = Some(complete_cb.into()); + self + } + + /// Create a `Batch` with only `success` callback specified. + pub fn with_success_callback(&mut self, success_cb: Job) -> Batch { + self.success(success_cb); + self.complete(None); + self.build() + } + + /// Create a `Batch` with only `complete` callback specified. + pub fn with_complete_callback(&mut self, complete_cb: Job) -> Batch { + self.complete(complete_cb); + self.success(None); + self.build() + } + + /// Create a `Batch` with both `success` and `complete` callbacks specified. + pub fn with_callbacks(&mut self, success_cb: Job, complete_cb: Job) -> Batch { + self.success(success_cb); + self.complete(complete_cb); + self.build() + } +} + +pub struct BatchHandle<'a, S: Read + Write> { + bid: String, + prod: &'a mut Producer, +} + +impl<'a, S: Read + Write> BatchHandle<'a, S> { + /// ID issued by the Faktory server to this batch. + pub fn id(&self) -> &str { + self.bid.as_ref() + } + + pub(crate) fn new(bid: String, prod: &mut Producer) -> BatchHandle<'_, S> { + BatchHandle { bid, prod } + } + + /// Add the given job to the batch. + pub fn add(&mut self, mut job: Job) -> Result<(), Error> { + job.custom.insert("bid".into(), self.bid.clone().into()); + self.prod.enqueue(job) + } + + /// Initiate a child batch of jobs. + pub fn start_batch(&mut self, mut batch: Batch) -> Result, Error> { + batch.parent_bid = Some(self.bid.clone()); + self.prod.start_batch(batch) + } + + /// Commit this batch. + /// + /// The Faktory server will not queue any callbacks, unless the batch is committed. + /// Committing an empty batch will make the server queue the callback(s) right away. + /// Once committed, the batch can still be re-opened with [open_batch](struct.Producer.html#method.open_batch), + /// and extra jobs can be added to it. + pub fn commit(self) -> Result<(), Error> { + self.prod.commit_batch(self.bid) + } +} + +/// Batch status retrieved from Faktory server. +#[derive(Deserialize, Debug)] +pub struct BatchStatus { + // Fields "bid", "created_at", "description", "total", "pending", and "failed" + // are described in the docs: https://github.com/contribsys/faktory/wiki/Ent-Batches#status + /// Id of this batch. + pub bid: String, + + /// Batch creation date and time. + pub created_at: DateTime, + + /// Batch description, if any. + pub description: Option, + + /// Number of jobs in this batch. + pub total: usize, + + /// Number of pending jobs. + pub pending: usize, + + /// Number of failed jobs. + pub failed: usize, + + // The official golang client also mentions "parent_bid', "complete_st", and "success_st": + // https://github.com/contribsys/faktory/blob/main/client/batch.go#L8-L22 + /// Id of the parent batch, provided this batch is a child ("nested") batch. + pub parent_bid: Option, + + /// State of the `complete` callback. + /// + /// See [complete](struct.Batch.html#structfield.complete). + #[serde(rename = "complete_st")] + pub complete_callback_state: String, + + /// State of the `success` callback. + /// + /// See [success](struct.Batch.html#structfield.success). + #[serde(rename = "success_st")] + pub success_callback_state: String, +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use chrono::{DateTime, Utc}; + + use super::*; + + #[test] + fn test_batch_creation() { + let b = BatchBuilder::new("Image processing batch".to_string()) + .with_success_callback(Job::builder("thumbnail").build()); + assert!(b.complete.is_none()); + assert!(b.parent_bid.is_none()); + assert!(b.success.is_some()); + assert_eq!(b.description, Some("Image processing batch".into())); + + let b = BatchBuilder::new("Image processing batch".to_string()) + .with_complete_callback(Job::builder("thumbnail").build()); + assert!(b.complete.is_some()); + assert!(b.success.is_none()); + + let b = BatchBuilder::new(None).with_callbacks( + Job::builder("thumbnail").build(), + Job::builder("thumbnail").build(), + ); + assert!(b.description.is_none()); + assert!(b.complete.is_some()); + assert!(b.success.is_some()); + } + + #[test] + fn test_batch_serialized_correctly() { + let prepare_test_job = |jobtype: String| { + let jid = "LFluKy1Baak83p54"; + let dt = "2023-12-22T07:00:52.546258624Z"; + let created_at = DateTime::::from_str(dt).unwrap(); + Job::builder(jobtype) + .jid(jid) + .created_at(created_at) + .build() + }; + + // with description and on success callback: + let got = serde_json::to_string( + &BatchBuilder::new("Image processing workload".to_string()) + .with_success_callback(prepare_test_job("thumbnail_clean_up".into())), + ) + .unwrap(); + let expected = r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"#; + assert_eq!(got, expected); + + // without description and with on complete callback: + let got = serde_json::to_string( + &BatchBuilder::new(None) + .with_complete_callback(prepare_test_job("thumbnail_info".into())), + ) + .unwrap(); + let expected = r#"{"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"#; + assert_eq!(got, expected); + + // with description and both callbacks: + let got = serde_json::to_string( + &BatchBuilder::new("Image processing workload".to_string()).with_callbacks( + prepare_test_job("thumbnail_clean_up".into()), + prepare_test_job("thumbnail_info".into()), + ), + ) + .unwrap(); + let expected = r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0},"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"#; + assert_eq!(got, expected); + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d8af3c0c..da04c252 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,12 +12,23 @@ mod single; // commands that users can issue pub use self::single::{ - Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl, + gen_random_wid, Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl, }; // responses that users can see pub use self::single::Hi; +pub use self::single::gen_random_jid; + +#[cfg(feature = "ent")] +mod batch; +#[cfg(feature = "ent")] +pub use self::single::ent::{Progress, ProgressUpdate, ProgressUpdateBuilder, Track}; +#[cfg(feature = "ent")] +pub use batch::{ + Batch, BatchBuilder, BatchHandle, BatchStatus, CommitBatch, GetBatchStatus, OpenBatch, +}; + pub(crate) fn get_env_url() -> String { use std::env; let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string()); @@ -44,6 +55,21 @@ pub(crate) fn url_parse(url: &str) -> Result { Ok(url) } +pub(crate) fn parse_provided_or_from_env(url: Option<&str>) -> Result { + url_parse(url.unwrap_or(&get_env_url())) +} + +fn check_protocols_match(ver: usize) -> Result<(), Error> { + if ver != EXPECTED_PROTOCOL_VERSION { + return Err(error::Connect::VersionMismatch { + ours: EXPECTED_PROTOCOL_VERSION, + theirs: ver, + } + .into()); + } + Ok(()) +} + /// A stream that can be re-established after failing. pub trait Reconnect: Sized { /// Re-establish the stream. @@ -133,22 +159,40 @@ impl Client { }; Self::new(stream, opts) } + + #[cfg(feature = "ent")] + pub(crate) fn new_tracker(stream: S, pwd: Option) -> Result, Error> { + let opts = ClientOptions { + password: pwd, + ..Default::default() + }; + let mut c = Client { + stream: BufStream::new(stream), + opts, + }; + c.init_tracker()?; + Ok(c) + } } impl Client { fn init(&mut self) -> Result<(), Error> { let hi = single::read_hi(&mut self.stream)?; - if hi.version != EXPECTED_PROTOCOL_VERSION { - return Err(error::Connect::VersionMismatch { - ours: EXPECTED_PROTOCOL_VERSION, - theirs: hi.version, + check_protocols_match(hi.version)?; + + let mut hello = single::Hello::default(); + + // prepare password hash, if one expected by 'Faktory' + if hi.salt.is_some() { + if let Some(ref pwd) = self.opts.password { + hello.set_password(&hi, pwd); + } else { + return Err(error::Connect::AuthenticationNeeded.into()); } - .into()); } // fill in any missing options, and remember them for re-connect - let mut hello = single::Hello::default(); if !self.opts.is_producer { let hostname = self .opts @@ -162,14 +206,7 @@ impl Client { .pid .unwrap_or_else(|| unsafe { getpid() } as usize); self.opts.pid = Some(pid); - let wid = self.opts.wid.clone().unwrap_or_else(|| { - use rand::{thread_rng, Rng}; - thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .map(char::from) - .take(32) - .collect() - }); + let wid = self.opts.wid.clone().unwrap_or_else(gen_random_wid); self.opts.wid = Some(wid); hello.hostname = Some(self.opts.hostname.clone().unwrap()); @@ -178,6 +215,18 @@ impl Client { hello.labels = self.opts.labels.clone(); } + single::write_command_and_await_ok(&mut self.stream, &hello) + } + + #[cfg(feature = "ent")] + fn init_tracker(&mut self) -> Result<(), Error> { + let hi = single::read_hi(&mut self.stream)?; + + check_protocols_match(hi.version)?; + + let mut hello = single::Hello::default(); + + // prepare password hash, if one expected by 'Faktory' if hi.salt.is_some() { if let Some(ref pwd) = self.opts.password { hello.set_password(&hi, pwd); @@ -256,6 +305,10 @@ impl<'a, S: Read + Write> ReadToken<'a, S> { { single::read_json(&mut self.0.stream) } + + pub(crate) fn read_bid(self) -> Result { + single::read_bid(&mut self.0.stream) + } } #[cfg(test)] diff --git a/src/proto/single/ent.rs b/src/proto/single/ent.rs index 1a866788..3aa91394 100644 --- a/src/proto/single/ent.rs +++ b/src/proto/single/ent.rs @@ -1,6 +1,23 @@ use chrono::{DateTime, Utc}; +use derive_builder::Builder; +use serde::{ + de::{Deserializer, IntoDeserializer}, + Deserialize, +}; -use crate::JobBuilder; +use crate::{Error, JobBuilder}; + +// Used to parse responses from Faktory that look like this: +// '{"jid":"f7APFzrS2RZi9eaA","state":"unknown","updated_at":""}' +fn parse_datetime<'de, D>(value: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(value)?.as_deref() { + Some("") | None => Ok(None), + Some(non_empty) => DateTime::deserialize(non_empty.into_deserializer()).map(Some), + } +} impl JobBuilder { /// When Faktory should expire this job. @@ -74,6 +91,135 @@ impl JobBuilder { } } +/// Info on job execution progress (retrieved). +/// +/// The tracker is guaranteed to get the following details: the job's id (though +/// they should know it beforehand in order to be ably to track the job), its last +/// know state (e.g."enqueued", "working", "success", "unknown") and the date and time +/// the job was last updated. Additionally, information on what's going on with the job +/// ([desc](struct.ProgressUpdate.html#structfield.desc)) and completion percentage +/// ([percent](struct.ProgressUpdate.html#structfield.percent)) may be available, +/// if the worker provided those details. +#[derive(Debug, Clone, Deserialize)] +pub struct Progress { + /// Id of the tracked job. + pub jid: String, + + /// Job's state. + pub state: String, + + /// When this job was last updated. + #[serde(deserialize_with = "parse_datetime")] + pub updated_at: Option>, + + /// Percentage of the job's completion. + pub percent: Option, + + /// Arbitrary description that may be useful to whoever is tracking the job's progress. + pub desc: Option, +} + +/// Info on job execution progress (sent). +/// +/// In Enterprise Faktory, a client executing a job can report on the execution +/// progress, provided the job is trackable. A trackable job is the one with "track":1 +/// specified in the custom data hash. +#[derive(Debug, Clone, Serialize, Builder)] +#[builder( + custom_constructor, + setter(into), + build_fn(name = "try_build", private) +)] +pub struct ProgressUpdate { + /// Id of the tracked job. + #[builder(setter(custom))] + pub jid: String, + + /// Percentage of the job's completion. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub percent: Option, + + /// Arbitrary description that may be useful to whoever is tracking the job's progress. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub desc: Option, + + /// Allows to extend the job's reservation, if more time needed to execute it. + /// + /// Note that you cannot decrease the initial [reservation](struct.Job.html#structfield.reserve_for). + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub reserve_until: Option>, +} + +impl ProgressUpdate { + /// Create a new instance of `ProgressUpdateBuilder` with job ID already set. + /// + /// Equivalent to creating a [new](struct.ProgressUpdateBuilder.html#method.new) + /// `ProgressUpdateBuilder`. + pub fn builder(jid: impl Into) -> ProgressUpdateBuilder { + ProgressUpdateBuilder { + jid: Some(jid.into()), + ..ProgressUpdateBuilder::create_empty() + } + } + + /// Create a new instance of `ProgressUpdate`. + /// + /// While job ID is specified at `ProgressUpdate`'s creation time, + /// the rest of the [fields](struct.ProgressUpdate.html) are defaulted to _None_. + pub fn new(jid: impl Into) -> ProgressUpdate { + ProgressUpdateBuilder::new(jid).build() + } +} + +impl ProgressUpdateBuilder { + /// Builds an instance of ProgressUpdate. + pub fn build(&self) -> ProgressUpdate { + self.try_build() + .expect("All required fields have been set.") + } + + /// Create a new instance of 'JobBuilder' + pub fn new(jid: impl Into) -> ProgressUpdateBuilder { + ProgressUpdateBuilder { + jid: Some(jid.into()), + ..ProgressUpdateBuilder::create_empty() + } + } +} + +// ---------------------------------------------- + +use super::FaktoryCommand; +use std::{fmt::Debug, io::Write}; + +#[derive(Debug, Clone)] +pub enum Track { + Set(ProgressUpdate), + Get(String), +} + +impl FaktoryCommand for Track { + fn issue(&self, w: &mut W) -> Result<(), Error> { + match self { + Self::Set(upd) => { + w.write_all(b"TRACK SET ")?; + serde_json::to_writer(&mut *w, upd).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) + } + Self::Get(jid) => { + w.write_all(b"TRACK GET ")?; + w.write_all(jid.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } + } + } +} + +// ---------------------------------------------- + #[cfg(test)] mod test { use chrono::{DateTime, Utc}; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a02df87b..3b30d325 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -8,12 +8,13 @@ mod resp; mod utils; #[cfg(feature = "ent")] -mod ent; +pub mod ent; use crate::error::Error; pub use self::cmd::*; pub use self::resp::*; +pub use self::utils::{gen_random_jid, gen_random_wid}; const JOB_DEFAULT_QUEUE: &str = "default"; const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; @@ -56,7 +57,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// ``` /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). -#[derive(Serialize, Deserialize, Debug, Builder)] +#[derive(Serialize, Deserialize, Debug, Clone, Builder)] #[builder( custom_constructor, setter(into), @@ -181,6 +182,24 @@ impl JobBuilder { self.try_build() .expect("All required fields have been set.") } + + /// Builds a new _trackable_ `Job``. + /// + /// Progress update can be sent and received only for the jobs that have + /// been explicitly marked as trackable via `"track":1` in the job's + /// custom hash. + /// ``` + /// use faktory::JobBuilder; + /// + /// let _job = JobBuilder::new("order") + /// .args(vec!["ISBN-13:9781718501850"]) + /// .build_trackable(); + /// ``` + #[cfg(feature = "ent")] + pub fn build_trackable(&mut self) -> Job { + self.add_to_custom_data("track", 1); + self.build() + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index acf13ba8..c668256c 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -59,6 +59,31 @@ pub fn read_json(r: R) -> Result(r: R) -> Result { + match read(r)? { + RawResponse::String(ref s) if s.is_empty() => Err(error::Protocol::BadType { + expected: "non-empty string representation of batch id", + received: "empty string".into(), + } + .into()), + RawResponse::String(ref s) => Ok(s.to_string()), + RawResponse::Blob(ref b) if b.is_empty() => Err(error::Protocol::BadType { + expected: "non-empty blob representation of batch id", + received: "empty blob".into(), + } + .into()), + RawResponse::Blob(ref b) => Ok(std::str::from_utf8(b) + .map_err(|_| error::Protocol::BadType { + expected: "valid blob representation of batch id", + received: "unprocessable blob".into(), + })? + .into()), + something_else => Err(bad("id", &something_else).into()), + } +} + +// ---------------------------------------------- + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Hi { #[serde(rename = "v")] diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index b6f068dc..8c7651dd 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -1,15 +1,24 @@ use rand::{thread_rng, Rng}; const JOB_ID_LENGTH: usize = 16; +const WORKER_ID_LENGTH: usize = 32; -pub fn gen_random_jid() -> String { +fn gen_random_id(length: usize) -> String { thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .map(char::from) - .take(JOB_ID_LENGTH) + .take(length) .collect() } +pub fn gen_random_jid() -> String { + gen_random_id(JOB_ID_LENGTH) +} + +pub fn gen_random_wid() -> String { + gen_random_id(WORKER_ID_LENGTH) +} + #[cfg(test)] mod test { use std::collections::HashSet; @@ -17,17 +26,16 @@ mod test { use super::*; #[test] - fn test_jid_of_known_size_generated() { - let jid1 = gen_random_jid(); - let jid2 = gen_random_jid(); - assert_ne!(jid1, jid2); - println!("{}", jid1); - assert_eq!(jid1.len(), JOB_ID_LENGTH); - assert_eq!(jid2.len(), JOB_ID_LENGTH); + fn test_id_of_known_size_generated() { + let id1 = gen_random_id(WORKER_ID_LENGTH); + let id2 = gen_random_id(WORKER_ID_LENGTH); + assert_ne!(id1, id2); + assert_eq!(id1.len(), WORKER_ID_LENGTH); + assert_eq!(id2.len(), WORKER_ID_LENGTH); } #[test] - fn test_jids_are_unique() { + fn test_ids_are_unique() { let mut ids = HashSet::new(); ids.insert("IYKOxEfLcwcgKaRa".to_string()); @@ -37,8 +45,8 @@ mod test { ids.clear(); for _ in 0..1_000_000 { - let jid = gen_random_jid(); - ids.insert(jid); + let id = gen_random_id(JOB_ID_LENGTH); + ids.insert(id); } assert_eq!(ids.len(), 1_000_000); } diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs new file mode 100644 index 00000000..2ba3a9fa --- /dev/null +++ b/src/tracker/mod.rs @@ -0,0 +1,122 @@ +use std::io::{Read, Write}; +use std::net::TcpStream; + +use crate::proto::{host_from_url, parse_provided_or_from_env, Client, GetBatchStatus, Track}; +use crate::{BatchStatus, Error, Progress, ProgressUpdate}; + +/// Used for retrieving and updating information on a job's execution progress +/// (see [`Progress`] and [`ProgressUpdate`]), as well for retrieving a batch's status +/// from the Faktory server (see [`BatchStatus`]). +/// +/// Fetching a job's execution progress: +/// ```no_run +/// use faktory::Tracker; +/// let job_id = String::from("W8qyVle9vXzUWQOf"); +/// let mut tracker = Tracker::connect(None)?; +/// if let Some(progress) = tracker.get_progress(job_id)? { +/// if progress.state == "success" { +/// # /* +/// ... +/// # */ +/// } +/// } +/// # Ok::<(), faktory::Error>(()) +/// ``` +/// Sending an update on a job's execution progress: +/// ```no_run +/// use faktory::{Tracker, ProgressUpdateBuilder}; +/// let jid = String::from("W8qyVle9vXzUWQOf"); +/// let mut tracker = Tracker::connect(None)?; +/// let progress = ProgressUpdateBuilder::new(&jid) +/// .desc("Almost done...".to_owned()) +/// .percent(99) +/// .build(); +/// tracker.set_progress(progress)?; +/// # Ok::<(), faktory::Error>(()) +///```` +/// Fetching a batch's status: +/// ```no_run +/// use faktory::Tracker; +/// let bid = String::from("W8qyVle9vXzUWQOg"); +/// let mut tracker = Tracker::connect(None)?; +/// if let Some(status) = tracker.get_batch_status(bid)? { +/// println!("This batch created at {}", status.created_at); +/// } +/// # Ok::<(), faktory::Error>(()) +/// ``` +pub struct Tracker { + c: Client, +} + +impl Tracker { + /// Create new tracker and connect to a Faktory server. + /// + /// If `url` is not given, will use the standard Faktory environment variables. Specifically, + /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address + /// from (defaults to `FAKTORY_URL`), and then that environment variable is read to get the + /// server address. If the latter environment variable is not defined, the connection will be + /// made to + /// + /// ```text + /// tcp://localhost:7419 + /// ``` + pub fn connect(url: Option<&str>) -> Result, Error> { + let url = parse_provided_or_from_env(url)?; + let addr = host_from_url(&url); + let pwd = url.password().map(Into::into); + let stream = TcpStream::connect(addr)?; + Ok(Tracker { + c: Client::new_tracker(stream, pwd)?, + }) + } +} + +impl Tracker { + /// Connect to a Faktory server with a non-standard stream. + pub fn connect_with(stream: S, pwd: Option) -> Result, Error> { + Ok(Tracker { + c: Client::new_tracker(stream, pwd)?, + }) + } + + /// Send information on a job's execution progress to Faktory. + pub fn set_progress(&mut self, upd: ProgressUpdate) -> Result<(), Error> { + let cmd = Track::Set(upd); + self.c.issue(&cmd)?.await_ok() + } + + /// Fetch information on a job's execution progress from Faktory. + pub fn get_progress(&mut self, jid: String) -> Result, Error> { + let cmd = Track::Get(jid); + self.c.issue(&cmd)?.read_json() + } + + /// Fetch information on a batch of jobs execution progress. + pub fn get_batch_status(&mut self, bid: String) -> Result, Error> { + let cmd = GetBatchStatus::from(bid); + self.c.issue(&cmd)?.read_json() + } +} + +#[cfg(test)] +mod test { + use crate::proto::{host_from_url, parse_provided_or_from_env}; + + use super::Tracker; + use std::{env, net::TcpStream}; + + #[test] + fn test_trackers_created_ok() { + if env::var_os("FAKTORY_URL").is_none() || env::var_os("FAKTORY_ENT").is_none() { + return; + } + let _ = Tracker::connect(None).expect("tracker successfully instantiated and connected"); + + let url = parse_provided_or_from_env(None).expect("valid url"); + let host = host_from_url(&url); + let stream = TcpStream::connect(host).expect("connected"); + let pwd = url.password().map(String::from); + let _ = Tracker::connect_with(stream, pwd) + .expect("tracker successfully instantiated and connected"); + } +} diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index d1f651f1..b971fbdc 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -2,7 +2,9 @@ extern crate faktory; extern crate serde_json; extern crate url; +use chrono::Utc; use faktory::*; +use serde_json::Value; use std::io; macro_rules! skip_if_not_enterprise { @@ -13,6 +15,20 @@ macro_rules! skip_if_not_enterprise { }; } +macro_rules! assert_had_one { + ($c:expr, $q:expr) => { + let had_one_job = $c.run_one(0, &[$q]).unwrap(); + assert!(had_one_job); + }; +} + +macro_rules! assert_is_empty { + ($c:expr, $q:expr) => { + let had_one_job = $c.run_one(0, &[$q]).unwrap(); + assert!(!had_one_job); + }; +} + fn learn_faktory_url() -> String { let url = std::env::var_os("FAKTORY_URL").expect( "Enterprise Faktory should be running for this test, and 'FAKTORY_URL' environment variable should be provided", @@ -20,6 +36,15 @@ fn learn_faktory_url() -> String { url.to_str().expect("Is a utf-8 string").to_owned() } +fn some_jobs(kind: S, q: S, count: usize) -> impl Iterator +where + S: Into + Clone + 'static, +{ + (0..count) + .into_iter() + .map(move |_| Job::builder(kind.clone()).queue(q.clone()).build()) +} + #[test] fn ent_expiring_job() { use std::{thread, time}; @@ -29,12 +54,12 @@ fn ent_expiring_job() { let url = learn_faktory_url(); // prepare a producer ("client" in Faktory terms) and consumer ("worker"): - let mut producer = Producer::connect(Some(&url)).unwrap(); - let mut consumer = ConsumerBuilder::default(); - consumer.register("AnExpiringJob", move |job| -> io::Result<_> { + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("AnExpiringJob", move |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); - let mut consumer = consumer.connect(Some(&url)).unwrap(); + let mut c = c.connect(Some(&url)).unwrap(); // prepare an expiring job: let job_ttl_secs: u64 = 3; @@ -42,32 +67,31 @@ fn ent_expiring_job() { let ttl = chrono::Duration::seconds(job_ttl_secs as i64); let job1 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue("ent_expiring_job") .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and fetch immediately job1: - producer.enqueue(job1).unwrap(); - let had_job = consumer.run_one(0, &["default"]).unwrap(); - assert!(had_job); + p.enqueue(job1).unwrap(); + assert_had_one!(&mut c, "ent_expiring_job"); // check that the queue is drained: - let had_job = consumer.run_one(0, &["default"]).unwrap(); - assert!(!had_job); + assert_is_empty!(&mut c, "ent_expiring_job"); // prepare another one: let job2 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue("ent_expiring_job") .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and then fetch job2, but after ttl: - producer.enqueue(job2).unwrap(); + p.enqueue(job2).unwrap(); thread::sleep(time::Duration::from_secs(job_ttl_secs * 2)); - let had_job = consumer.run_one(0, &["default"]).unwrap(); // For the non-enterprise edition of Faktory, this assertion will // fail, which should be taken into account when running the test suite on CI. - assert!(!had_job); + assert_is_empty!(&mut c, "ent_expiring_job"); } #[test] @@ -82,12 +106,12 @@ fn ent_unique_job() { let job_type = "order"; // prepare producer and consumer: - let mut producer = Producer::connect(Some(&url)).unwrap(); - let mut consumer = ConsumerBuilder::default(); - consumer.register(job_type, |job| -> io::Result<_> { + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register(job_type, |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); - let mut consumer = consumer.connect(Some(&url)).unwrap(); + let mut c = c.connect(Some(&url)).unwrap(); // Reminder. Jobs are considered unique for kind + args + queue. // So the following two jobs, will be accepted by Faktory, since we @@ -98,18 +122,18 @@ fn ent_unique_job() { .args(args.clone()) .queue(queue_name) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); let job2 = JobBuilder::new(job_type) .args(args.clone()) .queue(queue_name) .build(); - producer.enqueue(job2).unwrap(); + p.enqueue(job2).unwrap(); - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + let had_job = c.run_one(0, &[queue_name]).unwrap(); assert!(had_job); - let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); + let had_another_one = c.run_one(0, &[queue_name]).unwrap(); assert!(had_another_one); - let and_that_is_it_for_now = !consumer.run_one(0, &[queue_name]).unwrap(); + let and_that_is_it_for_now = !c.run_one(0, &[queue_name]).unwrap(); assert!(and_that_is_it_for_now); // let's now create a unique job and followed by a job with @@ -121,7 +145,7 @@ fn ent_unique_job() { .queue(queue_name) .unique_for(unique_for_secs) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); // this one is a 'duplicate' ... let job2 = Job::builder(job_type) .args(args.clone()) @@ -129,7 +153,7 @@ fn ent_unique_job() { .unique_for(unique_for_secs) .build(); // ... so the server will respond accordingly: - let res = producer.enqueue(job2).unwrap_err(); + let res = p.enqueue(job2).unwrap_err(); if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { assert_eq!(msg, "Job not unique"); } else { @@ -137,12 +161,12 @@ fn ent_unique_job() { } // Let's now consume the job which is 'holding' a unique lock: - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + let had_job = c.run_one(0, &[queue_name]).unwrap(); assert!(had_job); // And check that the queue is really empty (`job2` from above // has not been queued indeed): - let queue_is_empty = !consumer.run_one(0, &[queue_name]).unwrap(); + let queue_is_empty = !c.run_one(0, &[queue_name]).unwrap(); assert!(queue_is_empty); // Now let's repeat the latter case, but providing different args to job2: @@ -151,7 +175,7 @@ fn ent_unique_job() { .queue(queue_name) .unique_for(unique_for_secs) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); // this one is *NOT* a 'duplicate' ... let job2 = JobBuilder::new(job_type) .args(vec![Value::from("ISBN-13:9781718501850"), Value::from(101)]) @@ -159,16 +183,12 @@ fn ent_unique_job() { .unique_for(unique_for_secs) .build(); // ... so the server will accept it: - producer.enqueue(job2).unwrap(); - - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(had_job); - let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(had_another_one); + p.enqueue(job2).unwrap(); + assert_had_one!(&mut c, queue_name); + assert_had_one!(&mut c, queue_name); // and the queue is empty again: - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(!had_job); + assert_is_empty!(&mut c, queue_name); } #[test] @@ -325,11 +345,11 @@ fn ent_unique_job_until_start() { #[test] fn ent_unique_job_bypass_unique_lock() { use faktory::error; - use serde_json::Value; skip_if_not_enterprise!(); let url = learn_faktory_url(); + let mut producer = Producer::connect(Some(&url)).unwrap(); let job1 = Job::builder("order") @@ -361,3 +381,609 @@ fn ent_unique_job_bypass_unique_lock() { panic!("Expected protocol error.") } } + +#[test] +fn test_tracker_can_send_and_retrieve_job_execution_progress() { + use std::{ + io, + sync::{Arc, Mutex}, + thread, time, + }; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let t = Arc::new(Mutex::new( + Tracker::connect(Some(&url)).expect("job progress tracker created successfully"), + )); + + let t_captured = Arc::clone(&t); + + let mut p = Producer::connect(Some(&url)).unwrap(); + + let job_tackable = JobBuilder::new("order") + .args(vec![Value::from("ISBN-13:9781718501850")]) + .queue("test_tracker_can_send_progress_update") + .build_trackable(); + + let job_ordinary = JobBuilder::new("order") + .args(vec![Value::from("ISBN-13:9781718501850")]) + .queue("test_tracker_can_send_progress_update") + .build(); + + // let's remember this job's id: + let job_id = job_tackable.id().to_owned(); + let job_id_captured = job_id.clone(); + + p.enqueue(job_tackable).expect("enqueued"); + + let mut c = ConsumerBuilder::default(); + c.register("order", move |job| -> io::Result<_> { + // trying to set progress on a community edition of Faktory will give: + // 'an internal server error occurred: tracking subsystem is only available in Faktory Enterprise' + let result = t_captured.lock().expect("lock acquired").set_progress( + ProgressUpdateBuilder::new(&job_id_captured) + .desc("Still processing...".to_owned()) + .percent(32) + .build(), + ); + assert!(result.is_ok()); + // let's sleep for a while ... + thread::sleep(time::Duration::from_secs(2)); + + // ... and read the progress info + let result = t_captured + .lock() + .expect("lock acquired") + .get_progress(job_id_captured.clone()) + .expect("Retrieved progress update over the wire"); + + assert!(result.is_some()); + let result = result.unwrap(); + assert_eq!(result.jid, job_id_captured.clone()); + assert_eq!(result.state, "working"); + assert!(result.updated_at.is_some()); + assert_eq!(result.desc, Some("Still processing...".to_owned())); + assert_eq!(result.percent, Some(32)); + // considering the job done + Ok(eprintln!("{:?}", job)) + }); + + let mut c = c + .connect(Some(&url)) + .expect("Successfully ran a handshake with 'Faktory'"); + assert_had_one!(&mut c, "test_tracker_can_send_progress_update"); + + let result = t + .lock() + .expect("lock acquired successfully") + .get_progress(job_id.clone()) + .expect("Retrieved progress update over the wire once again") + .expect("Some progress"); + + assert_eq!(result.jid, job_id); + // 'Faktory' will be keeping last known update for at least 30 minutes: + assert_eq!(result.desc, Some("Still processing...".to_owned())); + assert_eq!(result.percent, Some(32)); + + // But it actually knows the job's real status, since the consumer (worker) + // informed it immediately after finishing with the job: + assert_eq!(result.state, "success"); + + // What about 'ordinary' job ? + let job_id = job_ordinary.id().to_owned().clone(); + + // Sending it ... + p.enqueue(job_ordinary) + .expect("Successfuly send to Faktory"); + + // ... and asking for its progress + let progress = t + .lock() + .expect("lock acquired") + .get_progress(job_id.clone()) + .expect("Retrieved progress update over the wire once again") + .expect("Some progress"); + + // From the docs: + // There are several reasons why a job's state might be unknown: + // The JID is invalid or was never actually enqueued. + // The job was not tagged with the track variable in the job's custom attributes: custom:{"track":1}. + // The job's tracking structure has expired in Redis. It lives for 30 minutes and a big queue backlog can lead to expiration. + assert_eq!(progress.jid, job_id); + + // Returned from Faktory: '{"jid":"f7APFzrS2RZi9eaA","state":"unknown","updated_at":""}' + assert_eq!(progress.state, "unknown"); + assert!(progress.updated_at.is_none()); + assert!(progress.percent.is_none()); + assert!(progress.desc.is_none()); +} + +#[test] +fn test_batch_of_jobs_can_be_initiated() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("thumbnail", move |_job| -> io::Result<_> { Ok(()) }); + c.register("clean_up", move |_job| -> io::Result<_> { Ok(()) }); + let mut c = c.connect(Some(&url)).unwrap(); + let mut t = Tracker::connect(Some(&url)).expect("job progress tracker created successfully"); + + let job_1 = Job::builder("thumbnail") + .args(vec!["path/to/original/image1"]) + .queue("test_batch_of_jobs_can_be_initiated") + .build(); + let job_2 = Job::builder("thumbnail") + .args(vec!["path/to/original/image2"]) + .queue("test_batch_of_jobs_can_be_initiated") + .build(); + let job_3 = Job::builder("thumbnail") + .args(vec!["path/to/original/image3"]) + .queue("test_batch_of_jobs_can_be_initiated") + .build(); + + let cb_job = Job::builder("clean_up") + .queue("test_batch_of_jobs_can_be_initiated__CALLBACKs") + .build(); + + let batch = + Batch::builder("Image resizing workload".to_string()).with_complete_callback(cb_job); + + let time_just_before_batch_init = Utc::now(); + + let mut b = p.start_batch(batch).unwrap(); + + // let's remember batch id: + let bid = b.id().to_string(); + + b.add(job_1).unwrap(); + b.add(job_2).unwrap(); + b.add(job_3).unwrap(); + b.commit().unwrap(); + + // The batch has been committed, let's see its status: + let time_just_before_getting_status = Utc::now(); + + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // Just to make a meaningfull assertion about the BatchStatus's 'created_at' field: + assert!(s.created_at > time_just_before_batch_init); + assert!(s.created_at < time_just_before_getting_status); + assert_eq!(s.bid, bid); + assert_eq!(s.description, Some("Image resizing workload".into())); + assert_eq!(s.total, 3); // three jobs registered + assert_eq!(s.pending, 3); // and none executed just yet + assert_eq!(s.failed, 0); + // Docs do not mention it, but the golang client does: + // https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19 + assert_eq!(s.success_callback_state, ""); // we did not even provide the 'success' callback + assert_eq!(s.complete_callback_state, ""); // the 'complete' callback is pending + + // consume and execute job 1 ... + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + // ... and try consuming from the "callback" queue: + assert_is_empty!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); + + // let's ask the Faktory server about the batch status after + // we have consumed one job from this batch: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 1 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 2); + assert_eq!(s.failed, 0); + + // now, consume and execute job 2 + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + // ... and check the callback queue again: + assert_is_empty!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); // not just yet ... + + // let's check batch status once again: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 1); + assert_eq!(s.failed, 0); + + // finally, consume and execute job 3 - the last one from the batch + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + + // let's check batch status to see what happens after + // all the jobs from the batch have been executed: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 0); + assert_eq!(s.failed, 0); + assert_eq!(s.complete_callback_state, "1"); // callback has been enqueued!! + + // let's now successfully consume from the "callback" queue: + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); + + // let's check batch status one last time: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.complete_callback_state, "2"); // means calledback successfully executed +} + +#[test] +fn test_batches_can_be_nested() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + // Set up 'producer', 'consumer', and 'tracker': + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("jobtype", move |_job| -> io::Result<_> { Ok(()) }); + let mut _c = c.connect(Some(&url)).unwrap(); + let mut t = Tracker::connect(Some(&url)).expect("job progress tracker created successfully"); + + // Prepare some jobs: + let parent_job1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let child_job_1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let child_job_2 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let grand_child_job_1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + + // Sccording to Faktory docs: + // "The callback for a parent batch will not enqueue until the callback for the child batch has finished." + // See: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees + let parent_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + let child_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + let grandchild_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + + // batches start + let parent_batch = + Batch::builder("Parent batch".to_string()).with_success_callback(parent_cb_job); + let mut parent_batch = p.start_batch(parent_batch).unwrap(); + let parent_batch_id = parent_batch.id().to_owned(); + parent_batch.add(parent_job1).unwrap(); + + let child_batch = Batch::builder("Child batch".to_string()).with_success_callback(child_cb_job); + let mut child_batch = parent_batch.start_batch(child_batch).unwrap(); + let child_batch_id = child_batch.id().to_owned(); + child_batch.add(child_job_1).unwrap(); + child_batch.add(child_job_2).unwrap(); + + let grandchild_batch = + Batch::builder("Grandchild batch".to_string()).with_success_callback(grandchild_cb_job); + let mut grandchild_batch = child_batch.start_batch(grandchild_batch).unwrap(); + let grandchild_batch_id = grandchild_batch.id().to_owned(); + grandchild_batch.add(grand_child_job_1).unwrap(); + + grandchild_batch.commit().unwrap(); + child_batch.commit().unwrap(); + parent_batch.commit().unwrap(); + // batches finish + + let parent_status = t + .get_batch_status(parent_batch_id.clone()) + .unwrap() + .unwrap(); + assert_eq!(parent_status.description, Some("Parent batch".to_string())); + assert_eq!(parent_status.total, 1); + assert_eq!(parent_status.parent_bid, None); + + let child_status = t.get_batch_status(child_batch_id.clone()).unwrap().unwrap(); + assert_eq!(child_status.description, Some("Child batch".to_string())); + assert_eq!(child_status.total, 2); + assert_eq!(child_status.parent_bid, Some(parent_batch_id)); + + let grandchild_status = t.get_batch_status(grandchild_batch_id).unwrap().unwrap(); + assert_eq!( + grandchild_status.description, + Some("Grandchild batch".to_string()) + ); + assert_eq!(grandchild_status.total, 1); + assert_eq!(grandchild_status.parent_bid, Some(child_batch_id)); +} + +#[test] +fn test_callback_will_not_be_queued_unless_batch_gets_committed() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + // prepare a producer, a consumer of 'order' jobs, and a tracker: + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("order", move |_job| -> io::Result<_> { Ok(()) }); + c.register("order_clean_up", move |_job| -> io::Result<_> { Ok(()) }); + let mut c = c.connect(Some(&url)).unwrap(); + let mut t = Tracker::connect(Some(&url)).unwrap(); + + let mut jobs = some_jobs( + "order", + "test_callback_will_not_be_queued_unless_batch_gets_committed", + 3, + ); + let mut callbacks = some_jobs( + "order_clean_up", + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs", + 1, + ); + + // start a 'batch': + let mut b = p + .start_batch( + Batch::builder("Orders processing workload".to_string()) + .with_success_callback(callbacks.next().unwrap()), + ) + .unwrap(); + let bid = b.id().to_string(); + + // push 3 jobs onto this batch, but DO NOT commit the batch: + for _ in 0..3 { + b.add(jobs.next().unwrap()).unwrap(); + } + + // check this batch's status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 3); + assert_eq!(s.pending, 3); + assert_eq!(s.success_callback_state, ""); // has not been queued; + + // consume those 3 jobs successfully; + for _ in 0..3 { + assert_had_one!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed" + ); + } + + // verify the queue is drained: + assert_is_empty!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed" + ); + + // check this batch's status again: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 3); + assert_eq!(s.pending, 0); + assert_eq!(s.failed, 0); + assert_eq!(s.success_callback_state, ""); // not just yet; + + // to double-check, let's assert the success callbacks queue is empty: + assert_is_empty!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs" + ); + + // now let's COMMIT the batch ... + b.commit().unwrap(); + + // ... and check batch status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.success_callback_state, "1"); // callback has been queued; + + // finally, let's consume from the success callbacks queue ... + assert_had_one!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs" + ); + + // ... and see the final status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.success_callback_state, "2"); // callback successfully executed; +} + +#[test] +fn test_callback_will_be_queue_upon_commit_even_if_batch_is_empty() { + use std::{thread, time}; + + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut t = Tracker::connect(Some(&url)).unwrap(); + let jobtype = "callback_jobtype"; + let q_name = "test_callback_will_be_queue_upon_commit_even_if_batch_is_empty"; + let mut callbacks = some_jobs(jobtype, q_name, 2); + let b = p + .start_batch( + Batch::builder("Orders processing workload".to_string()) + .with_callbacks(callbacks.next().unwrap(), callbacks.next().unwrap()), + ) + .unwrap(); + let bid = b.id().to_owned(); + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); // no jobs in the batch; + assert_eq!(s.success_callback_state, ""); // not queued; + assert_eq!(s.complete_callback_state, ""); // not queued; + + b.commit().unwrap(); + + // let's give the Faktory server some time: + thread::sleep(time::Duration::from_secs(2)); + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); // again, there are no jobs in the batch ... + + // The docs say "If you don't push any jobs into the batch, any callbacks will fire immediately upon BATCH COMMIT." + // and "the success callback for a batch will always enqueue after the complete callback" + assert_eq!(s.complete_callback_state, "1"); // queued + assert_eq!(s.success_callback_state, ""); // not queued + + let mut c = ConsumerBuilder::default(); + c.register(jobtype, move |_job| -> io::Result<_> { Ok(()) }); + let mut c = c.connect(Some(&url)).unwrap(); + + assert_had_one!(&mut c, q_name); // complete callback consumed + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + assert_eq!(s.complete_callback_state, "2"); // successfully executed + assert_eq!(s.success_callback_state, "1"); // queued + + assert_had_one!(&mut c, q_name); // success callback consumed + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + assert_eq!(s.complete_callback_state, "2"); // successfully executed + assert_eq!(s.success_callback_state, "2"); // successfully executed +} + +#[test] +fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut t = Tracker::connect(Some(&url)).unwrap(); + let mut jobs = some_jobs("order", "test_batch_can_be_reopned_add_extra_jobs_added", 4); + let mut callbacks = some_jobs( + "order_clean_up", + "test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs", + 1, + ); + + let b = Batch::builder("Orders processing workload".to_string()) + .with_success_callback(callbacks.next().unwrap()); + + let mut b = p.start_batch(b).unwrap(); + let bid = b.id().to_string(); + b.add(jobs.next().unwrap()).unwrap(); // 1 job + b.add(jobs.next().unwrap()).unwrap(); // 2 jobs + + let status = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(status.total, 2); + assert_eq!(status.pending, 2); + + // ############################## SUBTEST 0 ########################################## + // Let's fist of all try to open the batch we have not committed yet: + let mut b = p.open_batch(bid.clone()).unwrap(); + assert_eq!(b.id(), bid); + b.add(jobs.next().unwrap()).unwrap(); // 3 jobs + + b.commit().unwrap(); // committig the batch + + let status = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(status.total, 3); + assert_eq!(status.pending, 3); + + // Subtest 0 result: + // The Faktory server let's us open the uncommitted batch. This is something not mention + // in the docs, but still worth checking. + + // ############################## SUBTEST 1 ########################################## + // From the docs: + // """Note that, once committed, only a job within the batch may reopen it. + // Faktory will return an error if you dynamically add jobs from "outside" the batch; + // this is to prevent a race condition between callbacks firing and an outsider adding more jobs.""" + // Ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#batch-open-bid (Jan 10, 2024) + + // Let's try to open an already committed batch: + let mut b = p.open_batch(bid.clone()).unwrap(); + assert_eq!(b.id(), bid); + b.add(jobs.next().unwrap()).unwrap(); // 4 jobs + b.commit().unwrap(); // committing the batch again! + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 4); + assert_eq!(s.pending, 4); + + // Subtest 1 result: + // We managed to open a batch "from outside" and the server accepted the job INSTEAD OF ERRORING BACK. + // ############################ END OF SUBTEST 1 ####################################### + + // ############################## SUBTEST 2 ############################################ + // Let's see if we will be able to - again - open the committed batch "from outside" and + // add a nested batch to it. + let mut b = p.open_batch(bid.clone()).unwrap(); + assert_eq!(b.id(), bid); // this is to make sure this is the same batch INDEED + let mut nested_callbacks = some_jobs( + "order_clean_up__NESTED", + "test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs__NESTED", + 2, + ); + let nested_batch_declaration = + Batch::builder("Orders processing workload. Nested stage".to_string()).with_callbacks( + nested_callbacks.next().unwrap(), + nested_callbacks.next().unwrap(), + ); + let nested_batch = b.start_batch(nested_batch_declaration).unwrap(); + let nested_bid = nested_batch.id().to_string(); + // committing the nested batch without any jobs + // since those are just not relevant for this test: + nested_batch.commit().unwrap(); + + let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + assert_eq!(s.parent_bid, Some(bid)); // this is really our child batch + assert_eq!(s.complete_callback_state, "1"); // has been enqueud + + // Subtest 2 result: + // We managed to open an already committed batch "from outside" and the server accepted + // a nested batch INSTEAD OF ERRORING BACK. + // ############################ END OF SUBTEST 2 ####################################### + + // ############################## SUBTEST 3 ############################################ + // From the docs: + // """Once a callback has enqueued for a batch, you may not add anything to the batch.""" + // ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees (Jan 10, 2024) + + // Let's try to re-open the nested batch that we have already committed and add some jobs to it. + let mut b = p.open_batch(nested_bid.clone()).unwrap(); + assert_eq!(b.id(), nested_bid); // this is to make sure this is the same batch INDEED + let mut more_jobs = some_jobs( + "order_clean_up__NESTED", + "test_batch_can_be_reopned_add_extra_jobs_added__NESTED", + 2, + ); + b.add(more_jobs.next().unwrap()).unwrap(); + b.add(more_jobs.next().unwrap()).unwrap(); + b.commit().unwrap(); + + let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap(); + assert_eq!(s.complete_callback_state, "1"); // again, it has been enqueud ... + assert_eq!(s.pending, 2); // ... though there are pending jobs + assert_eq!(s.total, 2); + + // Subtest 3 result: + // We were able to add more jobs to the batch for which the Faktory server had already + // queued the callback. + // ############################## END OF SUBTEST 3 ##################################### + + // ############################## OVERALL RESULTS ###################################### + // The guarantees that definitely hold: + // + // 1) the callbacks will fire immediately after the jobs of this batch have been executed, provided the batch has been committed; + // + // 2) the callbacks will fire immediately for an empty batch, provided it has been committed; + // + // 3) the 'complete' callback will always be queued first + // (this is shown as part the test 'test_callback_will_be_queue_upon_commit_even_if_batch_is_empty'); +}