Skip to content

Commit

Permalink
Add 'CallbackState' enum. Use for complete and success callback type
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Feb 4, 2024
1 parent c8b7450 commit 0edbd83
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub use crate::proto::{Job, JobBuilder, Reconnect};
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub use crate::proto::{
set_progress, Batch, BatchBuilder, BatchHandle, BatchStatus, JobState, Progress,
set_progress, Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, JobState, Progress,
ProgressUpdate, ProgressUpdateBuilder,
};
#[cfg(feature = "ent")]
Expand Down
40 changes: 36 additions & 4 deletions src/proto/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// You can retieve the batch status using a [`Tracker`](struct.Tracker.html):
/// ```no_run
/// # use faktory::Error;
/// # use faktory::{Producer, Job, Batch, Tracker};
/// # use faktory::{Producer, Job, Batch, Tracker, CallbackState};
/// let mut prod = Producer::connect(None)?;
/// let job = Job::builder("job_type").build();
/// let cb_job = Job::builder("callback_job_type").build();
Expand All @@ -101,7 +101,11 @@ pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// assert_eq!(s.total, 1);
/// assert_eq!(s.pending, 1);
/// assert_eq!(s.description, Some("Batch description".into()));
/// assert_eq!(s.complete_callback_state, ""); // has not been queued;
///
/// match s.complete_callback_state {
/// CallbackState::Pending => {},
/// _ => panic!("The jobs of this batch have not executed, so the callback job is expected to _not_ have fired"),
/// }
/// # Ok::<(), Error>(())
/// ```
#[derive(Builder, Debug, Serialize)]
Expand Down Expand Up @@ -235,6 +239,34 @@ impl<'a, S: Read + Write> BatchHandle<'a, S> {
}
}

// Not documented, but existing de fakto and also mentioned in the official client
// https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19
/// State of a `callback` job of a [`Batch`].
#[derive(Debug, Clone, Deserialize)]
pub enum CallbackState {
/// Not enqueued yet.
#[serde(rename = "")]
Pending,
/// Enqueued by the server, because the jobs belonging to this batch have finished executing.
#[serde(rename = "1")]
Enqueued,
/// The enqueued callback job has been consumed.
#[serde(rename = "2")]
FinishedOk,
}

impl std::fmt::Display for CallbackState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use CallbackState::*;
let s = match self {
Pending => "Pending",
Enqueued => "Enqueued",
FinishedOk => "FinishedOk",
};
write!(f, "{}", s)
}
}

/// Batch status retrieved from Faktory server.
#[derive(Deserialize, Debug)]
pub struct BatchStatus {
Expand Down Expand Up @@ -267,13 +299,13 @@ pub struct BatchStatus {
///
/// See [with_complete_callback](struct.BatchBuilder.html#method.with_complete_callback).
#[serde(rename = "complete_st")]
pub complete_callback_state: String,
pub complete_callback_state: CallbackState,

/// State of the `success` callback.
///
/// See [with_success_callback](struct.BatchBuilder.html#method.with_success_callback).
#[serde(rename = "success_st")]
pub success_callback_state: String,
pub success_callback_state: CallbackState,
}

#[cfg(feature = "ent")]
Expand Down
3 changes: 2 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ pub use self::single::ent::{
mod batch;
#[cfg(feature = "ent")]
pub use batch::{
Batch, BatchBuilder, BatchHandle, BatchStatus, CommitBatch, GetBatchStatus, OpenBatch,
Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, CommitBatch, GetBatchStatus,
OpenBatch,
};

pub(crate) fn get_env_url() -> String {
Expand Down
7 changes: 2 additions & 5 deletions src/proto/single/ent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub enum JobState {
Dead,
}

impl Display for JobState {
impl std::fmt::Display for JobState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use JobState::*;
let s = match self {
Expand Down Expand Up @@ -246,10 +246,7 @@ pub fn set_progress(jid: impl Into<String>, percent: u8) -> ProgressUpdate {
// ----------------------------------------------

use super::FaktoryCommand;
use std::{
fmt::{Debug, Display},
io::Write,
};
use std::{fmt::Debug, io::Write};

#[derive(Debug, Clone)]
pub enum Track {
Expand Down
62 changes: 39 additions & 23 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ fn test_batch_of_jobs_can_be_initiated() {
assert_eq!(s.failed, 0);
// Docs do not mention it, but the golang client does:
// https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19
assert_eq!(s.success_callback_state, ""); // we did not even provide the 'success' callback
assert_eq!(s.complete_callback_state, ""); // the 'complete' callback is pending
assert_eq!(s.success_callback_state.to_string(), "Pending"); // we did not even provide the 'success' callback
assert_eq!(s.complete_callback_state.to_string(), "Pending");

// consume and execute job 1 ...
assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated");
Expand Down Expand Up @@ -658,7 +658,7 @@ fn test_batch_of_jobs_can_be_initiated() {
assert_eq!(s.total, 3);
assert_eq!(s.pending, 0);
assert_eq!(s.failed, 0);
assert_eq!(s.complete_callback_state, "1"); // callback has been enqueued!!
assert_eq!(s.complete_callback_state.to_string(), "Enqueued");

// let's now successfully consume from the "callback" queue:
assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs");
Expand All @@ -670,7 +670,7 @@ fn test_batch_of_jobs_can_be_initiated() {
.expect("...and it's not none");

// this is because we have just consumed and executed 2 of 3 jobs:
assert_eq!(s.complete_callback_state, "2"); // means calledback successfully executed
assert_eq!(s.complete_callback_state.to_string(), "FinishedOk");
}

#[test]
Expand Down Expand Up @@ -805,7 +805,7 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() {
let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 3);
assert_eq!(s.pending, 3);
assert_eq!(s.success_callback_state, ""); // has not been queued;
assert_eq!(s.success_callback_state.to_string(), "Pending");

// consume those 3 jobs successfully;
for _ in 0..3 {
Expand All @@ -826,7 +826,7 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() {
assert_eq!(s.total, 3);
assert_eq!(s.pending, 0);
assert_eq!(s.failed, 0);
assert_eq!(s.success_callback_state, ""); // not just yet;
assert_eq!(s.success_callback_state.to_string(), "Pending"); // not just yet;

// to double-check, let's assert the success callbacks queue is empty:
assert_is_empty!(
Expand All @@ -839,7 +839,7 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() {

// ... and check batch status:
let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.success_callback_state, "1"); // callback has been queued;
assert_eq!(s.success_callback_state.to_string(), "Enqueued");

// finally, let's consume from the success callbacks queue ...
assert_had_one!(
Expand All @@ -849,7 +849,7 @@ fn test_callback_will_not_be_queued_unless_batch_gets_committed() {

// ... and see the final status:
let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.success_callback_state, "2"); // callback successfully executed;
assert_eq!(s.success_callback_state.to_string(), "FinishedOk");
}

#[test]
Expand All @@ -860,22 +860,24 @@ fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() {
let url = learn_faktory_url();
let mut p = Producer::connect(Some(&url)).unwrap();
let mut t = Tracker::connect(Some(&url)).unwrap();
let jobtype = "callback_jobtype";
let q_name = "test_callback_will_be_queued_upon_commit_even_if_batch_is_empty";
let mut callbacks = some_jobs(jobtype, q_name, 2);
let complete_cb_jobtype = "complete_callback_jobtype";
let success_cb_jobtype = "success_cb_jobtype";
let complete_cb = some_jobs(complete_cb_jobtype, q_name, 1).next().unwrap();
let success_cb = some_jobs(success_cb_jobtype, q_name, 1).next().unwrap();
let b = p
.start_batch(
Batch::builder()
.description("Orders processing workload")
.with_callbacks(callbacks.next().unwrap(), callbacks.next().unwrap()),
.with_callbacks(success_cb, complete_cb),
)
.unwrap();
let bid = b.id().to_owned();

let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 0); // no jobs in the batch;
assert_eq!(s.success_callback_state, ""); // not queued;
assert_eq!(s.complete_callback_state, ""); // not queued;
assert_eq!(s.success_callback_state.to_string(), "Pending");
assert_eq!(s.complete_callback_state.to_string(), "Pending");

b.commit().unwrap();

Expand All @@ -887,26 +889,37 @@ fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() {

// The docs say "If you don't push any jobs into the batch, any callbacks will fire immediately upon BATCH COMMIT."
// and "the success callback for a batch will always enqueue after the complete callback"
assert_eq!(s.complete_callback_state, "1"); // queued
assert_eq!(s.success_callback_state, ""); // not queued
assert_eq!(s.complete_callback_state.to_string(), "Enqueued");
assert_eq!(s.success_callback_state.to_string(), "Pending");

let mut c = ConsumerBuilder::default();
c.register(jobtype, move |_job| -> io::Result<_> { Ok(()) });
c.register(complete_cb_jobtype, move |_job| -> io::Result<_> { Ok(()) });
c.register(success_cb_jobtype, move |_job| -> io::Result<_> {
Err(io::Error::new(
io::ErrorKind::Other,
"we want this one to fail to test the 'CallbackState' behavior",
))
});
let mut c = c.connect(Some(&url)).unwrap();

assert_had_one!(&mut c, q_name); // complete callback consumed

let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 0);
assert_eq!(s.complete_callback_state, "2"); // successfully executed
assert_eq!(s.success_callback_state, "1"); // queued

match s.complete_callback_state {
CallbackState::FinishedOk => {}
_ => panic!("Expected the callback to have been successfully executed"),
}
match s.success_callback_state {
CallbackState::Enqueued => {}
_ => panic!("Expected the callback to have been enqueued, since the `complete` callback has already executed"),
}
assert_had_one!(&mut c, q_name); // success callback consumed

let s = t.get_batch_status(bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 0);
assert_eq!(s.complete_callback_state, "2"); // successfully executed
assert_eq!(s.success_callback_state, "2"); // successfully executed
assert_eq!(s.complete_callback_state.to_string(), "FinishedOk");
assert_eq!(s.success_callback_state.to_string(), "FinishedOk");
}

#[test]
Expand Down Expand Up @@ -1009,7 +1022,7 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap();
assert_eq!(s.total, 0);
assert_eq!(s.parent_bid, Some(bid)); // this is really our child batch
assert_eq!(s.complete_callback_state, "1"); // has been enqueud
assert_eq!(s.complete_callback_state.to_string(), "Enqueued");

// Subtest 3 result:
// We managed to open an already committed batch "from outside" and the server accepted
Expand All @@ -1036,7 +1049,10 @@ fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() {
b.commit().unwrap();

let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap();
assert_eq!(s.complete_callback_state, "1"); // again, it has been enqueud ...
match s.complete_callback_state {
CallbackState::Enqueued => {}
_ => panic!("Expected the callback to have been enqueued"),
}
assert_eq!(s.pending, 2); // ... though there are pending jobs
assert_eq!(s.total, 2);

Expand Down

0 comments on commit 0edbd83

Please sign in to comment.