diff --git a/src/lib.rs b/src/lib.rs index 02a88aec..5fc30fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, - MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, + MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 24fe7533..7f16a061 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,8 +12,8 @@ pub use client::{Client, Connection}; mod single; pub use single::{ - DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder, - MutationTarget, ServerSnapshot, WorkerId, + DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId, }; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 2cbe4087..5bf7503a 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -124,6 +124,8 @@ pub struct Job { /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + // TODO: this should probably be a usize, see Failure::retry_count + // TODO: and Failure::retry_remaining. This can go to 0.14 release pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -206,19 +208,39 @@ impl JobBuilder { } } +/// Details on a job's failure. #[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] pub struct Failure { - retry_count: usize, - failed_at: String, + /// [`Number`](Job::retry) of times this job can be retried. + pub retry_count: usize, + + /// Number of remaining retry attempts. + #[serde(rename = "remaining")] + pub retry_remaining: usize, + + /// Last time this job failed. + pub failed_at: DateTime, #[serde(skip_serializing_if = "Option::is_none")] - next_at: Option, + + /// When this job will be retried. + /// + /// This will be `None` if there are no retry + /// attempts (see [`Failure::retry_remaining`]) left. + pub next_at: Option>, #[serde(skip_serializing_if = "Option::is_none")] - message: Option, + + /// Error message, if any. + pub message: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "errtype")] - kind: Option, + + /// Error kind, if known. + pub kind: Option, #[serde(skip_serializing_if = "Option::is_none")] - backtrace: Option>, + + /// Stack trace from last failure, if any. + pub backtrace: Option>, } impl Job { diff --git a/tests/real/community.rs b/tests/real/community.rs index b5646db0..dcd7367e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,9 +1,10 @@ -use crate::{assert_gte, skip_check}; +use crate::{assert_gt, assert_gte, assert_lt, skip_check}; use chrono::Utc; use faktory::{ Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; +use rand::Rng; use serde_json::Value; use std::time::Duration; use std::{io, sync}; @@ -835,6 +836,8 @@ async fn test_panic_in_handler() { #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_jobs() { skip_check!(); + let test_started_at = Utc::now(); + let max_retries = rand::thread_rng().gen_range(2..25); // prepare a client and clean up the queue // to ensure there are no left-overs @@ -859,7 +862,10 @@ async fn mutation_requeue_jobs() { .unwrap(); // enqueue a job - let job = JobBuilder::new(local).queue(local).build(); + let job = JobBuilder::new(local) + .queue(local) + .retry(max_retries) + .build(); let job_id = job.id().clone(); client.enqueue(job).await.unwrap(); @@ -872,7 +878,7 @@ async fn mutation_requeue_jobs() { let had_one = worker.run_one(0, &[local]).await.unwrap(); assert!(!had_one); - // ... we can force it + // ... we can force it, so let's requeue the job and ... client .requeue( MutationTarget::Retries, @@ -881,11 +887,40 @@ async fn mutation_requeue_jobs() { .await .unwrap(); - // the job has been re-enqueued and we consumed it again - let had_one = worker.run_one(0, &[local]).await.unwrap(); - assert!(had_one); + // ... this time, instead of failing the job this time, let's + // create a new woker that will just send the job + // to the test thread so that we can inspect and + // assert on the failure + let (tx, rx) = sync::mpsc::channel(); + let tx = sync::Arc::new(sync::Mutex::new(tx)); + let mut w = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local)) + .register_fn(local, move |j| { + let tx = sync::Arc::clone(&tx); + Box::pin(async move { + tx.lock().unwrap().send(j).unwrap(); + Ok::<(), io::Error>(()) + }) + }) + .connect() + .await + .unwrap(); + assert!(w.run_one(0, &[local]).await.unwrap()); + let job = rx.recv().unwrap(); - // TODO: Examine the job's failure (will need a dedicated PR) + let failure_info = job.failure().as_ref().unwrap(); + assert_eq!(failure_info.retry_count, 0); + assert_eq!( + failure_info.retry_remaining, + max_retries as usize - failure_info.retry_count + ); + assert_lt!(failure_info.failed_at, Utc::now()); + assert_gt!(failure_info.failed_at, test_started_at); + assert!(failure_info.next_at.is_some()); + assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown"); + assert!(failure_info.message.is_some()); // "task panicked" + assert!(failure_info.backtrace.is_none()); } #[tokio::test(flavor = "multi_thread")]