Skip to content

Commit

Permalink
Make Failure public
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Nov 29, 2024
1 parent cded093 commit 682f45d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ mod worker;
pub use crate::error::Error;

pub use crate::proto::{
Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter,
MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId,
Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId,
MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId,
};

pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder};
Expand Down
4 changes: 2 additions & 2 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub use client::{Client, Connection};
mod single;

pub use single::{
DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder,
MutationTarget, ServerSnapshot, WorkerId,
DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter,
MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId,
};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};
Expand Down
34 changes: 28 additions & 6 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub struct Job {
/// Defaults to 25.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")]
// TODO: this should probably be a usize, see Failure::retry_count
// TODO: and Failure::retry_remaining. This can go to 0.14 release
pub retry: Option<isize>,

/// The priority of this job from 1-9 (9 is highest).
Expand Down Expand Up @@ -206,19 +208,39 @@ impl JobBuilder {
}
}

/// Details on a job's failure.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[non_exhaustive]
pub struct Failure {
retry_count: usize,
failed_at: String,
/// [`Number`](Job::retry) of times this job can be retried.
pub retry_count: usize,

/// Number of remaining retry attempts.
#[serde(rename = "remaining")]
pub retry_remaining: usize,

/// Last time this job failed.
pub failed_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
next_at: Option<String>,

/// When this job will be retried.
///
/// This will be `None` if there are no retry
/// attempts (see [`Failure::retry_remaining`]) left.
pub next_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,

/// Error message, if any.
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errtype")]
kind: Option<String>,

/// Error kind, if known.
pub kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
backtrace: Option<Vec<String>>,

/// Stack trace from last failure, if any.
pub backtrace: Option<Vec<String>>,
}

impl Job {
Expand Down
49 changes: 42 additions & 7 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{assert_gte, skip_check};
use crate::{assert_gt, assert_gte, assert_lt, skip_check};
use chrono::Utc;
use faktory::{
Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker,
WorkerBuilder, WorkerId,
};
use rand::Rng;
use serde_json::Value;
use std::time::Duration;
use std::{io, sync};
Expand Down Expand Up @@ -835,6 +836,8 @@ async fn test_panic_in_handler() {
#[tokio::test(flavor = "multi_thread")]
async fn mutation_requeue_jobs() {
skip_check!();
let test_started_at = Utc::now();
let max_retries = rand::thread_rng().gen_range(2..25);

// prepare a client and clean up the queue
// to ensure there are no left-overs
Expand All @@ -859,7 +862,10 @@ async fn mutation_requeue_jobs() {
.unwrap();

// enqueue a job
let job = JobBuilder::new(local).queue(local).build();
let job = JobBuilder::new(local)
.queue(local)
.retry(max_retries)
.build();
let job_id = job.id().clone();
client.enqueue(job).await.unwrap();

Expand All @@ -872,7 +878,7 @@ async fn mutation_requeue_jobs() {
let had_one = worker.run_one(0, &[local]).await.unwrap();
assert!(!had_one);

// ... we can force it
// ... we can force it, so let's requeue the job and ...
client
.requeue(
MutationTarget::Retries,
Expand All @@ -881,11 +887,40 @@ async fn mutation_requeue_jobs() {
.await
.unwrap();

// the job has been re-enqueued and we consumed it again
let had_one = worker.run_one(0, &[local]).await.unwrap();
assert!(had_one);
// ... this time, instead of failing the job this time, let's
// create a new woker that will just send the job
// to the test thread so that we can inspect and
// assert on the failure
let (tx, rx) = sync::mpsc::channel();
let tx = sync::Arc::new(sync::Mutex::new(tx));
let mut w = WorkerBuilder::default()
.hostname("tester".to_string())
.wid(WorkerId::new(local))
.register_fn(local, move |j| {
let tx = sync::Arc::clone(&tx);
Box::pin(async move {
tx.lock().unwrap().send(j).unwrap();
Ok::<(), io::Error>(())
})
})
.connect()
.await
.unwrap();
assert!(w.run_one(0, &[local]).await.unwrap());
let job = rx.recv().unwrap();

// TODO: Examine the job's failure (will need a dedicated PR)
let failure_info = job.failure().as_ref().unwrap();
assert_eq!(failure_info.retry_count, 0);
assert_eq!(
failure_info.retry_remaining,
max_retries as usize - failure_info.retry_count
);
assert_lt!(failure_info.failed_at, Utc::now());
assert_gt!(failure_info.failed_at, test_started_at);
assert!(failure_info.next_at.is_some());
assert_eq!(failure_info.kind.as_ref().unwrap(), "unknown");
assert!(failure_info.message.is_some()); // "task <task number> panicked"
assert!(failure_info.backtrace.is_none());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down

0 comments on commit 682f45d

Please sign in to comment.