From 5a96b3a13e475dff531a81151ad1c49d24bb5111 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Tue, 19 Nov 2024 12:08:05 +0400 Subject: [PATCH 01/23] #61 Add test --- Makefile | 3 ++- tests/real/community.rs | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c5eee45d..f950adf0 100644 --- a/Makefile +++ b/Makefile @@ -54,7 +54,8 @@ test/doc: .PHONY: test/e2e test/e2e: - FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} cargo test --locked --all-features --all-targets + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} cargo test \ + --locked --all-features --all-targets -- $(pattern) .PHONY: test/e2e/tls test/e2e/tls: diff --git a/tests/real/community.rs b/tests/real/community.rs index 0dfd6363..34fb802a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -759,3 +759,41 @@ async fn test_panic_in_handler() { assert!(!w.is_terminated()); assert!(w.run_one(0, &[local]).await.unwrap()); } + +#[tokio::test(flavor = "multi_thread")] +async fn job_failure_record() { + skip_check!(); + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "job_failure_record"; + let mut client = Client::connect().await.unwrap(); + client.queue_remove(&[local]).await.unwrap(); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn("rpc_procedure_name", move |_job| async move { + panic!("Failure should be recorded"); + }) + .connect() + .await + .unwrap(); + + // enqueue a job + client + .enqueue(JobBuilder::new("rpc_procedure_name").queue(local).build()) + .await + .unwrap(); + + // consume and fail it + let had_one = worker.run_one(0, &[local]).await.unwrap(); + assert!(had_one); + + // the Faktory server will now put this job to `retries` set (provided retries are + // possible for this job, which they are by default) and after a while requeue it; + // we want to be able to accelerate with 'after a while' be means of `MUTATE` API; + // + // TODO: 1) add MUTATE bindings + // TODO: 2) force the job from `retries` to `scheduled` + // TODO: 3) examine the job's failture (will need a dedicate PR) +} From eec5bba8e0c168279aa1945c17a258c389e82b4f Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 20 Nov 2024 10:52:23 +0400 Subject: [PATCH 02/23] Improve self_to_cmd macro --- src/proto/single/cmd.rs | 76 ++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 13efb0ab..2798b8d1 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -9,6 +9,16 @@ pub trait FaktoryCommand { } macro_rules! self_to_cmd { + ($struct:ident) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + let cmd = stringify!($struct).to_uppercase(); + w.write_all(cmd.as_bytes()).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; ($struct:ident, $cmd:expr) => { #[async_trait::async_trait] impl FaktoryCommand for $struct { @@ -21,6 +31,30 @@ macro_rules! self_to_cmd { } } }; + ($struct:ident<$lt:lifetime>, $cmd:expr) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct<$lt> { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all($cmd.as_bytes()).await?; + w.write_all(b" ").await?; + let r = serde_json::to_vec(self).map_err(Error::Serialization)?; + w.write_all(&r).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; + ($struct:ident, $cmd:expr, $field:tt) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all($cmd.as_bytes()).await?; + w.write_all(b" ").await?; + let r = serde_json::to_vec(&self.$field).map_err(Error::Serialization)?; + w.write_all(&r).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; } /// Write queues as part of a command. They are written with a leading space @@ -42,12 +76,7 @@ where pub(crate) struct Info; -#[async_trait::async_trait] -impl FaktoryCommand for Info { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - Ok(w.write_all(b"INFO\r\n").await?) - } -} +self_to_cmd!(Info); // -------------------- ACK ---------------------- @@ -138,12 +167,7 @@ self_to_cmd!(Fail, "FAIL"); pub(crate) struct End; -#[async_trait::async_trait] -impl FaktoryCommand for End { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - Ok(w.write_all(b"END\r\n").await?) - } -} +self_to_cmd!(End); // --------------------- FETCH -------------------- @@ -230,29 +254,13 @@ self_to_cmd!(Hello, "HELLO"); pub(crate) struct Push(Job); -use std::ops::Deref; -impl Deref for Push { - type Target = Job; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl From for Push { fn from(j: Job) -> Self { Push(j) } } -#[async_trait::async_trait] -impl FaktoryCommand for Push { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - w.write_all(b"PUSH ").await?; - let r = serde_json::to_vec(&**self).map_err(Error::Serialization)?; - w.write_all(&r).await?; - Ok(w.write_all(b"\r\n").await?) - } -} +self_to_cmd!(Push, "PUSH", 0); // ---------------------- PUSHB ------------------- @@ -264,15 +272,7 @@ impl From> for PushBulk { } } -#[async_trait::async_trait] -impl FaktoryCommand for PushBulk { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - w.write_all(b"PUSHB ").await?; - let r = serde_json::to_vec(&self.0).map_err(Error::Serialization)?; - w.write_all(&r).await?; - Ok(w.write_all(b"\r\n").await?) - } -} +self_to_cmd!(PushBulk, "PUSHB", 0); // ---------------------- QUEUE ------------------- From 2cf61a8feb87345d8c07489a079d2f89603bd6f8 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 20 Nov 2024 10:58:28 +0400 Subject: [PATCH 03/23] Add MutationAction to commands --- src/proto/single/cmd.rs | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 2798b8d1..b2c878d2 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -311,3 +311,47 @@ impl<'a, S: AsRef> QueueControl<'a, S> { Self { action, queues } } } + +// ---------------------- MUTATE ------------------- + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +#[non_exhaustive] +pub(crate) enum MutationType { + Kill, + Requeue, + Discard, + Clear, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +#[non_exhaustive] +pub enum MutationTarget { + Scheduled, + Retries, + Dead, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[non_exhaustive] +pub struct MutationFilter<'a> { + #[serde(skip_serializing_if = "Option::is_none")] + pub jids: Option<&'a [&'a str]>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub regexp: Option<&'a str>, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "jobtype")] + pub kind: Option<&'a str>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub(crate) struct MutationAction<'a> { + cmd: MutationType, + target: MutationTarget, + filter: MutationFilter<'a>, +} + +self_to_cmd!(MutationAction<'_>, "MUTATE"); From f61d4c051626bea55fd5bdc7a7992499a17cf23e Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 20 Nov 2024 18:50:06 +0400 Subject: [PATCH 04/23] Locate new logic in dedicated module --- src/lib.rs | 4 +- src/proto/client/mod.rs | 2 + src/proto/client/mutation.rs | 26 ++++++++++++ src/proto/mod.rs | 5 ++- src/proto/single/cmd.rs | 39 +++++------------- src/proto/single/mod.rs | 2 + src/proto/single/mutation.rs | 77 ++++++++++++++++++++++++++++++++++++ 7 files changed, 123 insertions(+), 32 deletions(-) create mode 100644 src/proto/client/mutation.rs create mode 100644 src/proto/single/mutation.rs diff --git a/src/lib.rs b/src/lib.rs index 28d8887a..02a88aec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, - ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 95e44c9b..e1bb4ba1 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -21,6 +21,8 @@ mod conn; pub(crate) use conn::BoxedConnection; pub use conn::Connection; +mod mutation; + pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; fn check_protocols_match(ver: usize) -> Result<(), Error> { diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs new file mode 100644 index 00000000..cf8f982c --- /dev/null +++ b/src/proto/client/mutation.rs @@ -0,0 +1,26 @@ +use super::Client; + +impl Client { + /* + From Go bindings: + + // Move the given jobs from structure to the Dead set. + // Faktory will not touch them anymore but you can still see them in the Web UI. + // + // Kill(Retries, OfType("DataSyncJob").WithJids("abc", "123")) + Kill(name Structure, filter JobFilter) error + + // Move the given jobs to their associated queue so they can be immediately + // picked up and processed. + Requeue(name Structure, filter JobFilter) error + + // Throw away the given jobs, e.g. if you want to delete all jobs named "QuickbooksSyncJob" + // + // Discard(Dead, OfType("QuickbooksSyncJob")) + Discard(name Structure, filter JobFilter) error + + // Empty the entire given structure, e.g. if you want to clear all retries. + // This is very fast as it is special cased by Faktory. + Clear(name Structure) error + */ +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a0304ccb..24fe7533 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,10 @@ pub use client::{Client, Connection}; mod single; -pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId}; +pub use single::{ + DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder, + MutationTarget, ServerSnapshot, WorkerId, +}; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index b2c878d2..a616d28b 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -3,6 +3,8 @@ use crate::proto::{Job, JobId, WorkerId}; use std::error::Error as StdError; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use super::{MutationFilter, MutationTarget}; + #[async_trait::async_trait] pub trait FaktoryCommand { async fn issue(&self, w: &mut W) -> Result<(), Error>; @@ -31,9 +33,9 @@ macro_rules! self_to_cmd { } } }; - ($struct:ident<$lt:lifetime>, $cmd:expr) => { + ($struct:ident<$lt:lifetime, $t:tt>, $cmd:expr) => { #[async_trait::async_trait] - impl FaktoryCommand for $struct<$lt> { + impl FaktoryCommand for $struct<$lt, $t> { async fn issue(&self, w: &mut W) -> Result<(), Error> { w.write_all($cmd.as_bytes()).await?; w.write_all(b" ").await?; @@ -314,10 +316,11 @@ impl<'a, S: AsRef> QueueControl<'a, S> { // ---------------------- MUTATE ------------------- -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] #[serde(rename_all = "lowercase")] #[non_exhaustive] pub(crate) enum MutationType { + #[default] Kill, Requeue, Discard, @@ -325,33 +328,11 @@ pub(crate) enum MutationType { } #[derive(Clone, Debug, PartialEq, Eq, Serialize)] -#[serde(rename_all = "lowercase")] -#[non_exhaustive] -pub enum MutationTarget { - Scheduled, - Retries, - Dead, -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] -#[non_exhaustive] -pub struct MutationFilter<'a> { - #[serde(skip_serializing_if = "Option::is_none")] - pub jids: Option<&'a [&'a str]>, - - #[serde(skip_serializing_if = "Option::is_none")] - pub regexp: Option<&'a str>, - - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "jobtype")] - pub kind: Option<&'a str>, -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] -pub(crate) struct MutationAction<'a> { +pub(crate) struct MutationAction<'a, J> { cmd: MutationType, target: MutationTarget, - filter: MutationFilter<'a>, + #[serde(skip_serializing_if = "Option::is_none")] + filter: Option>, } -self_to_cmd!(MutationAction<'_>, "MUTATE"); +self_to_cmd!(MutationAction<'_, JobId>, "MUTATE"); diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 031b08cd..2cbe4087 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -7,11 +7,13 @@ use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt}; mod cmd; mod id; +mod mutation; mod resp; mod utils; pub use cmd::*; pub use id::{JobId, WorkerId}; +pub use mutation::{MutationFilter, MutationFilterBuilder, MutationTarget}; pub use resp::*; #[cfg(feature = "ent")] diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs new file mode 100644 index 00000000..e307fb3f --- /dev/null +++ b/src/proto/single/mutation.rs @@ -0,0 +1,77 @@ +use derive_builder::Builder; + +use crate::JobId; + +/// TODO +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +#[non_exhaustive] +pub enum MutationTarget { + /// TODO + #[default] + Scheduled, + + /// TODO + Retries, + + /// TODO + Dead, +} + +/// TODO +#[derive(Builder, Clone, Debug, Default, PartialEq, Eq, Serialize)] +#[builder(setter(into), build_fn(name = "try_build", private))] +#[non_exhaustive] +pub struct MutationFilter<'a, J> { + /// TODO + /// + /// TODO + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(custom))] + pub jids: Option<&'a [J]>, + + /// TODO + /// + /// TODO + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "regexp")] + pub pattern: Option<&'a str>, + + /// TODO + /// + /// TODO + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "jobtype")] + pub kind: Option<&'a str>, +} + +impl<'a, J> MutationFilter<'_, J> +where + J: Clone, +{ + /// TODO + pub fn builder() -> MutationFilterBuilder<'a, J> { + MutationFilterBuilder::default() + } +} + +impl<'a, J> MutationFilterBuilder<'a, J> +where + J: Clone + AsRef, +{ + /// TODO + pub fn jids(mut self, value: &'a [J]) -> Self { + self.jids = Some(value).into(); + self + } +} + +impl<'a, J> MutationFilterBuilder<'a, J> +where + J: Clone, +{ + /// TODO + pub fn build(self) -> MutationFilter<'a, J> { + self.try_build().expect("infallible") + } +} From 51bcb8aa70d1685edffb0628fdc1d0c037ff4002 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 20 Nov 2024 19:00:43 +0400 Subject: [PATCH 05/23] Update CHANGELOG.md --- CHANGELOG.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49a168f9..862ab583 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support Faktory's `MUTATE` API ([#87]) + ### Changed ### Deprecated @@ -19,16 +21,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +[#87]: https://github.com/jonhoo/faktory-rs/pull/87 + ## [0.13.0] - 2024-10-27 ### Added -- `Error::Stream` for underlying 'native_tls' and 'rustls' errors ([#49]) +- rustls, native_tls: `Error::Stream` for underlying `native_tls` and `rustls` errors ([#49]) - Shutdown signal via `WorkerBuilder::with_graceful_shutdown` ([#57]) - Shutdown timeout via `WorkerBuilder::shutdown_timeout` ([#57]) - `Client` method for pausing, resuming, and removing queues ([#59]) - `Client::current_info` and `FaktoryState` struct ([#63]) -- TLS configurations options to `WorkerBuilder` ([#74]) +- rustls, native_tls: TLS configurations options to `WorkerBuilder` ([#74]) ### Changed @@ -57,13 +61,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `JobRunner` trait and `ConsumerBuilder::register_runner` ([#51]) - Support for enqueuing numerous jobs with `Producer::enqueue_many` ([#54]) -- Faktory Enterprise Edition: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48]) -- Faktory Enterprise Edition: Setting and getting a job's progress ([#48]) +- ent: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48]) +- ent: Setting and getting a job's progress ([#48]) [#48]: https://github.com/jonhoo/faktory-rs/pull/48 [#51]: https://github.com/jonhoo/faktory-rs/pull/51 [#54]: https://github.com/jonhoo/faktory-rs/pull/54 - [unreleased]: https://github.com/jonhoo/faktory-rs/compare/v0.13.0...HEAD [0.13.0]: https://github.com/jonhoo/faktory-rs/compare/v0.12.5...v0.13.0 [0.12.5]: https://github.com/jonhoo/faktory-rs/compare/v0.12.4...v0.12.5 From ee3d90d9f01e26ff7ea90adbd9f2ec09e4960f8a Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 20 Nov 2024 19:02:52 +0400 Subject: [PATCH 06/23] Update crate version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83399ae6..c4677a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,7 +434,7 @@ dependencies = [ [[package]] name = "faktory" -version = "0.13.0" +version = "0.13.1-rc0" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 0027c75b..ac229f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "faktory" -version = "0.13.0" +version = "0.13.1-rc0" authors = ["Jon Gjengset "] edition = "2021" license = "MIT OR Apache-2.0" From 72ee004fc9015c201a8d0fc05d84ffbffb30a14b Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 20 Nov 2024 19:06:54 +0400 Subject: [PATCH 07/23] Make Makefile commands more ergonomic --- Makefile | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index f950adf0..34d11cca 100644 --- a/Makefile +++ b/Makefile @@ -54,18 +54,20 @@ test/doc: .PHONY: test/e2e test/e2e: - FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} cargo test \ - --locked --all-features --all-targets -- $(pattern) + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \ + cargo test --locked --all-features --all-targets -- \ + --nocapture $(pattern) .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ FAKTORY_URL=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT} \ - cargo test --locked --features native_tls,rustls --test tls -- --nocapture + cargo test --locked --features native_tls,rustls --test tls -- \ + --nocapture $(pattern) .PHONY: test/load test/load: - cargo run --release --features binaries + cargo run --release --features binaries $(jobs) $(threads) .PHONY: test/perf test/perf: From 1e8839af4da53002468f42cc407bfc8b8eb66046 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 20 Nov 2024 20:04:11 +0400 Subject: [PATCH 08/23] Add requeue method --- src/proto/client/mutation.rs | 46 +++++++++++++++++++++++++++++++++++- src/proto/single/cmd.rs | 19 +++++++++++---- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index cf8f982c..7aeab702 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -1,6 +1,50 @@ -use super::Client; +use crate::{ + proto::single::{MutationAction, MutationType}, + Client, Error, JobId, MutationFilter, MutationTarget, +}; +use std::borrow::Borrow; impl Client { + /// TODO + pub async fn requeue_all(&mut self, target: MutationTarget) { + todo!() + } + + /// Re-enqueue the jobs. + /// + /// This will immediately move the jobs from the targeted set (see [`MutationTarget`]) + /// to their queues. + /// + /// Use a [`filter`](crate::MutationFilter) to narrow down the subset of jobs your would + /// like to requeue. + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{JobId, Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let job_id = JobId::new("3sgE_qwtqw15"); // e.g. a failed job's id + /// let ids = [&job_id]; + /// let filter = MutationFilter::builder().jids(&ids).build(); + /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn requeue<'a, F, J>( + &mut self, + target: MutationTarget, + filter: F, + ) -> Result<(), Error> + where + F: Borrow>, + J: 'a + AsRef + Sync, + { + self.issue(&MutationAction { + cmd: MutationType::Requeue, + target, + filter: filter.borrow().into(), + }) + .await? + .read_ok() + .await + } /* From Go bindings: diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index a616d28b..5a3ef715 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -329,10 +329,21 @@ pub(crate) enum MutationType { #[derive(Clone, Debug, PartialEq, Eq, Serialize)] pub(crate) struct MutationAction<'a, J> { - cmd: MutationType, - target: MutationTarget, + pub(crate) cmd: MutationType, + pub(crate) target: MutationTarget, #[serde(skip_serializing_if = "Option::is_none")] - filter: Option>, + pub(crate) filter: Option<&'a MutationFilter<'a, J>>, } -self_to_cmd!(MutationAction<'_, JobId>, "MUTATE"); +// self_to_cmd!(MutationAction<'_, J>, "MUTATE"); +#[async_trait::async_trait] +impl FaktoryCommand for MutationAction<'_, J> +where + J: AsRef + Sync, +{ + async fn issue(&self, w: &mut W) -> Result<(), Error> { + let cmd = stringify!($struct).to_uppercase(); + w.write_all(cmd.as_bytes()).await?; + Ok(w.write_all(b"\r\n").await?) + } +} From 44334ba8b02e225f2fc1a1e788765c6863eb4f95 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 20 Nov 2024 22:38:46 +0400 Subject: [PATCH 09/23] Test Client::requeue on failed jobs --- src/proto/client/mutation.rs | 30 ++++++++++------------------ src/proto/single/cmd.rs | 27 ++++++++++--------------- src/proto/single/mutation.rs | 36 +++++++++++++++++----------------- tests/real/community.rs | 38 ++++++++++++++++++++++++------------ 4 files changed, 64 insertions(+), 67 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 7aeab702..74e20d29 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -1,50 +1,40 @@ use crate::{ proto::single::{MutationAction, MutationType}, - Client, Error, JobId, MutationFilter, MutationTarget, + Client, Error, MutationFilter, MutationTarget, }; use std::borrow::Borrow; impl Client { - /// TODO - pub async fn requeue_all(&mut self, target: MutationTarget) { - todo!() - } - /// Re-enqueue the jobs. /// /// This will immediately move the jobs from the targeted set (see [`MutationTarget`]) - /// to their queues. + /// to their queues. This will apply to the jobs satisfying the [`filter`](crate::MutationFilter). /// - /// Use a [`filter`](crate::MutationFilter) to narrow down the subset of jobs your would - /// like to requeue. /// ```no_run /// # tokio_test::block_on(async { /// # use faktory::{JobId, Client, MutationTarget, MutationFilter}; /// # let mut client = Client::connect().await.unwrap(); - /// let job_id = JobId::new("3sgE_qwtqw15"); // e.g. a failed job's id - /// let ids = [&job_id]; - /// let filter = MutationFilter::builder().jids(&ids).build(); + /// let job_id1 = JobId::new("3sgE_qwtqw1501"); + /// let job_id2 = JobId::new("3sgE_qwtqw1502"); + /// let failed_ids = [&job_id1, &job_id2]; + /// let filter = MutationFilter::builder().jids(failed_ids.as_slice()).build(); /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); /// # }); /// ``` - pub async fn requeue<'a, F, J>( - &mut self, - target: MutationTarget, - filter: F, - ) -> Result<(), Error> + pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> where - F: Borrow>, - J: 'a + AsRef + Sync, + F: Borrow>, { self.issue(&MutationAction { cmd: MutationType::Requeue, target, - filter: filter.borrow().into(), + filter: Some(filter.borrow()), }) .await? .read_ok() .await } + /* From Go bindings: diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 5a3ef715..b4a59f2e 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -33,9 +33,9 @@ macro_rules! self_to_cmd { } } }; - ($struct:ident<$lt:lifetime, $t:tt>, $cmd:expr) => { + ($struct:ident<$lt:lifetime>, $cmd:expr) => { #[async_trait::async_trait] - impl FaktoryCommand for $struct<$lt, $t> { + impl FaktoryCommand for $struct<$lt> { async fn issue(&self, w: &mut W) -> Result<(), Error> { w.write_all($cmd.as_bytes()).await?; w.write_all(b" ").await?; @@ -327,23 +327,16 @@ pub(crate) enum MutationType { Clear, } +fn filter_is_empty(f: &Option<&MutationFilter<'_>>) -> bool { + f.is_none_or(|f| f.is_empty()) +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize)] -pub(crate) struct MutationAction<'a, J> { +pub(crate) struct MutationAction<'a> { pub(crate) cmd: MutationType, pub(crate) target: MutationTarget, - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) filter: Option<&'a MutationFilter<'a, J>>, + #[serde(skip_serializing_if = "filter_is_empty")] + pub(crate) filter: Option<&'a MutationFilter<'a>>, } -// self_to_cmd!(MutationAction<'_, J>, "MUTATE"); -#[async_trait::async_trait] -impl FaktoryCommand for MutationAction<'_, J> -where - J: AsRef + Sync, -{ - async fn issue(&self, w: &mut W) -> Result<(), Error> { - let cmd = stringify!($struct).to_uppercase(); - w.write_all(cmd.as_bytes()).await?; - Ok(w.write_all(b"\r\n").await?) - } -} +self_to_cmd!(MutationAction<'_>, "MUTATE"); diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index e307fb3f..557ec6d5 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -3,6 +3,9 @@ use derive_builder::Builder; use crate::JobId; /// TODO +/// +/// Use a [`filter`](crate::MutationFilter) to narrow down the subset of jobs your would +/// like to requeue. #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] #[serde(rename_all = "lowercase")] #[non_exhaustive] @@ -20,15 +23,15 @@ pub enum MutationTarget { /// TODO #[derive(Builder, Clone, Debug, Default, PartialEq, Eq, Serialize)] -#[builder(setter(into), build_fn(name = "try_build", private))] +#[builder(default, setter(into), build_fn(name = "try_build", private))] #[non_exhaustive] -pub struct MutationFilter<'a, J> { +pub struct MutationFilter<'a> { /// TODO /// /// TODO #[serde(skip_serializing_if = "Option::is_none")] #[builder(setter(custom))] - pub jids: Option<&'a [J]>, + pub jids: Option<&'a [&'a JobId]>, /// TODO /// @@ -45,33 +48,30 @@ pub struct MutationFilter<'a, J> { pub kind: Option<&'a str>, } -impl<'a, J> MutationFilter<'_, J> -where - J: Clone, -{ +impl MutationFilter<'_> { + pub(crate) fn is_empty(&self) -> bool { + self.jids.is_none() && self.kind.is_none() && self.pattern.is_none() + } +} + +impl<'a> MutationFilter<'_> { /// TODO - pub fn builder() -> MutationFilterBuilder<'a, J> { + pub fn builder() -> MutationFilterBuilder<'a> { MutationFilterBuilder::default() } } -impl<'a, J> MutationFilterBuilder<'a, J> -where - J: Clone + AsRef, -{ +impl<'a> MutationFilterBuilder<'a> { /// TODO - pub fn jids(mut self, value: &'a [J]) -> Self { + pub fn jids(mut self, value: &'a [&JobId]) -> Self { self.jids = Some(value).into(); self } } -impl<'a, J> MutationFilterBuilder<'a, J> -where - J: Clone, -{ +impl<'a> MutationFilterBuilder<'a> { /// TODO - pub fn build(self) -> MutationFilter<'a, J> { + pub fn build(self) -> MutationFilter<'a> { self.try_build().expect("infallible") } } diff --git a/tests/real/community.rs b/tests/real/community.rs index 34fb802a..6a3b2f2e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,5 +1,8 @@ use crate::{assert_gte, skip_check}; -use faktory::{Client, Job, JobBuilder, JobId, StopReason, Worker, WorkerBuilder, WorkerId}; +use faktory::{ + Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, + WorkerBuilder, WorkerId, +}; use serde_json::Value; use std::{io, sync, time::Duration}; use tokio::time as tokio_time; @@ -761,12 +764,12 @@ async fn test_panic_in_handler() { } #[tokio::test(flavor = "multi_thread")] -async fn job_failure_record() { +async fn mutation_requeue_jobs() { skip_check!(); // prepare a client and clean up the queue // to ensure there are no left-overs - let local = "job_failure_record"; + let local = "mutation_requeue_jobs"; let mut client = Client::connect().await.unwrap(); client.queue_remove(&[local]).await.unwrap(); @@ -780,20 +783,31 @@ async fn job_failure_record() { .unwrap(); // enqueue a job + let job = JobBuilder::new("rpc_procedure_name").queue(local).build(); + let job_id = job.id().clone(); + client.enqueue(job).await.unwrap(); + + // consume and fail it + let had_one = worker.run_one(0, &[local]).await.unwrap(); + assert!(had_one); + + // the job is now in `retries` set and is due to + // be rescheduled by the Faktory server after a while, but ... + let had_one = worker.run_one(0, &[local]).await.unwrap(); + assert!(!had_one); + + // ... we can force it client - .enqueue(JobBuilder::new("rpc_procedure_name").queue(local).build()) + .requeue( + MutationTarget::Retries, + MutationFilter::builder().jids(&[&job_id]).build(), + ) .await .unwrap(); - // consume and fail it + // the job has been re-enqueued and we consumed it again let had_one = worker.run_one(0, &[local]).await.unwrap(); assert!(had_one); - // the Faktory server will now put this job to `retries` set (provided retries are - // possible for this job, which they are by default) and after a while requeue it; - // we want to be able to accelerate with 'after a while' be means of `MUTATE` API; - // - // TODO: 1) add MUTATE bindings - // TODO: 2) force the job from `retries` to `scheduled` - // TODO: 3) examine the job's failture (will need a dedicate PR) + // TODO: Examine the job's failure (will need a dedicated PR) } From fce242be5c017ca027c456ed2223ac4d6cf5eea4 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 21 Nov 2024 10:24:43 +0400 Subject: [PATCH 10/23] Option::is_none_or not available in Rust 1.70 --- src/proto/single/cmd.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index b4a59f2e..67ff392f 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -328,7 +328,16 @@ pub(crate) enum MutationType { } fn filter_is_empty(f: &Option<&MutationFilter<'_>>) -> bool { - f.is_none_or(|f| f.is_empty()) + // Rust 1.82 has got the genious `Option::is_none_or`, + // and `Option::is_some_and` which will allow us to: + // ```rust + // f.is_none_or(|f| f.is_empty()) + // ``` + // As of crate version `0.13.1-rc0`, we've got `1.70` as MSRV. + match f { + None => true, + Some(f) => f.is_empty() + } } #[derive(Clone, Debug, PartialEq, Eq, Serialize)] From d3639f0ea046b4f31b952c7e665b740a7e7e2294 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 21 Nov 2024 11:29:59 +0400 Subject: [PATCH 11/23] Use owned patter for JobFilterBuilder --- src/proto/client/mutation.rs | 4 ++++ src/proto/single/mutation.rs | 38 ++++++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 74e20d29..6b2106d2 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -21,6 +21,10 @@ impl Client { /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); /// # }); /// ``` + // For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 + // The faktory will pull the entire targeted set from Redis to it's memory, iterate over + // each stringified job matching against "id":"...", deserialize the matches into Jobs and + // re-queue those jobs. pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> where F: Borrow>, diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index 557ec6d5..e1ea3cab 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -21,31 +21,43 @@ pub enum MutationTarget { Dead, } -/// TODO +/// Filter to help narrow down the mutation target. +/// +/// Example usage: +/// ```no_run +/// # tokio_test::block_on(async { +/// # use faktory::{Client, MutationTarget, MutationFilter}; +/// # let mut client = Client::connect().await.unwrap(); +/// let filter = MutationFilter::builder() +/// .kind("jobtype_here") +/// .pattern("*\"args\":[\"bob\"*") +/// .build(); +/// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); +/// # }) +/// ``` #[derive(Builder, Clone, Debug, Default, PartialEq, Eq, Serialize)] -#[builder(default, setter(into), build_fn(name = "try_build", private))] +#[builder( + default, + setter(into), + build_fn(name = "try_build", private), + pattern = "owned" +)] #[non_exhaustive] pub struct MutationFilter<'a> { /// TODO - /// - /// TODO + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "jobtype")] + pub kind: Option<&'a str>, + + /// Match jobs with the given ids. #[serde(skip_serializing_if = "Option::is_none")] #[builder(setter(custom))] pub jids: Option<&'a [&'a JobId]>, - /// TODO - /// /// TODO #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "regexp")] pub pattern: Option<&'a str>, - - /// TODO - /// - /// TODO - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "jobtype")] - pub kind: Option<&'a str>, } impl MutationFilter<'_> { From e24c960b63c150cc25ef0aa43e7d478482b0caa0 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 21 Nov 2024 13:06:56 +0400 Subject: [PATCH 12/23] Add more docs and tests for Client::requeue --- src/proto/single/cmd.rs | 4 +- src/proto/single/mutation.rs | 29 ++++---- tests/real/community.rs | 138 +++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 16 deletions(-) diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 67ff392f..1a983eda 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -328,7 +328,7 @@ pub(crate) enum MutationType { } fn filter_is_empty(f: &Option<&MutationFilter<'_>>) -> bool { - // Rust 1.82 has got the genious `Option::is_none_or`, + // Rust 1.82 has got the genious `Option::is_none_or`, // and `Option::is_some_and` which will allow us to: // ```rust // f.is_none_or(|f| f.is_empty()) @@ -336,7 +336,7 @@ fn filter_is_empty(f: &Option<&MutationFilter<'_>>) -> bool { // As of crate version `0.13.1-rc0`, we've got `1.70` as MSRV. match f { None => true, - Some(f) => f.is_empty() + Some(f) => f.is_empty(), } } diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index e1ea3cab..10cdefb2 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -2,22 +2,19 @@ use derive_builder::Builder; use crate::JobId; -/// TODO -/// -/// Use a [`filter`](crate::MutationFilter) to narrow down the subset of jobs your would -/// like to requeue. +/// Mutation target set. #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] #[serde(rename_all = "lowercase")] #[non_exhaustive] pub enum MutationTarget { - /// TODO + /// A set of currently enqueued jobs. #[default] Scheduled, - /// TODO + /// A set of jobs that should be retried. Retries, - /// TODO + /// A set of failed jobs that will not be retried. Dead, } @@ -30,7 +27,7 @@ pub enum MutationTarget { /// # let mut client = Client::connect().await.unwrap(); /// let filter = MutationFilter::builder() /// .kind("jobtype_here") -/// .pattern("*\"args\":[\"bob\"*") +/// .pattern(r#"*\"args\":\[\"fizz\"\]*"#) /// .build(); /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); /// # }) @@ -44,17 +41,21 @@ pub enum MutationTarget { )] #[non_exhaustive] pub struct MutationFilter<'a> { - /// TODO + /// A job's [`kind`](crate::Job::kind). #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "jobtype")] pub kind: Option<&'a str>, - /// Match jobs with the given ids. + /// Ids of jobs to target. #[serde(skip_serializing_if = "Option::is_none")] #[builder(setter(custom))] pub jids: Option<&'a [&'a JobId]>, - /// TODO + /// Match attern to use for filtering. + /// + /// Faktory will pass this directly to Redis's `SCAN` command, + /// so please see the [`SCAN` documentation](https://redis.io/docs/latest/commands/scan/) + /// for further details. #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "regexp")] pub pattern: Option<&'a str>, @@ -67,14 +68,14 @@ impl MutationFilter<'_> { } impl<'a> MutationFilter<'_> { - /// TODO + /// Creates a new builder for a [`MutationFilter`] pub fn builder() -> MutationFilterBuilder<'a> { MutationFilterBuilder::default() } } impl<'a> MutationFilterBuilder<'a> { - /// TODO + /// Ids of jobs to target. pub fn jids(mut self, value: &'a [&JobId]) -> Self { self.jids = Some(value).into(); self @@ -82,7 +83,7 @@ impl<'a> MutationFilterBuilder<'a> { } impl<'a> MutationFilterBuilder<'a> { - /// TODO + /// Builds a new [`MutationFilter`] from the parameters of this builder. pub fn build(self) -> MutationFilter<'a> { self.try_build().expect("infallible") } diff --git a/tests/real/community.rs b/tests/real/community.rs index 6a3b2f2e..295df01b 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -811,3 +811,141 @@ async fn mutation_requeue_jobs() { // TODO: Examine the job's failure (will need a dedicated PR) } + +#[tokio::test(flavor = "multi_thread")] +async fn mutation_requeue_specific_jobs_only() { + skip_check!(); + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "mutation_requeue_specific_jobs_only"; + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::default(), // just an empty filter + ) + .await + .unwrap(); + client.queue_remove(&[local]).await.unwrap(); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic!("Force fail this job"); + }) + .connect() + .await + .unwrap(); + + // enqueue two jobs + let job1 = JobBuilder::new(local) + .retry(10) + .queue(local) + .args(["fizz"]) + .build(); + let job_id1 = job1.id().clone(); + let job2 = JobBuilder::new(local) + .retry(10) + .queue(local) + .args(["buzz"]) + .build(); + let job_id2 = job2.id().clone(); + assert_ne!(job_id1, job_id2); // sanity check + client.enqueue_many([job1, job2]).await.unwrap(); + + // cosume them (and immediately fail) and make + // sure the queue is drained + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); + + // now let's only requeue one job of kind 'mutation_requeue_specific_jobs_only' + // following this example from the Faktory docs: + // + // MUTATE {"cmd":"kill","target":"retries","filter":{"jobtype":"QuickbooksSyncJob", "jids":["123456789", "abcdefgh"]}} + // (see: https://github.com/contribsys/faktory/wiki/Mutate-API#examples) + // + // NB! we only want one single job (with job_id1) to be immediately re-enqueued, but ... + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .jids(&[&job_id1]) + .kind(local) + .build(), + ) + .await + .unwrap(); + + // ... looks like _all_ the jobs of that jobtype are re-queued + // see: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L96-L98 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + + // let's prove that there _is_ still a way to only requeue one + // specific jobs, and to do so let's verify the queue is drained + // again (since we've just failed the two jobs again): + assert!(!worker.run_one(0, &[local]).await.unwrap()); + + // now, let's requeue a job _without_ specifying a jobkind, rather + // only `jids` in the filter: + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().jids(&[&job_id1]).build(), + ) + .await + .unwrap(); + + // we can now see that only one job has been re-scheduled, just like we wanted + // see: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L100-L110 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // and - for completeness - let's requeue the jobs using + // the comination of `kind` + `pattern` (jobtype and regexp in Faktory's terms); + // let's first make sure to force-reschedule the jobs: + client + .requeue( + MutationTarget::Retries, + MutationFilter::default(), // just an empty filter + ) + .await + .unwrap(); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // now let's use `kind` + `pattern` combo to only immeduately + // reschedule the job with "fizz" in the `args`, but not the + // one with "buzz": + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .kind(local) + .pattern(r#"*\"args\":\[\"fizz\"\]*"#) + .build(), + ) + .await + .unwrap(); + + // and indeed only one job has behttps://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/mutate.go#L83-L94en re-scheduled, + // see: https://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/mutate.go#L83-L94 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // just for sanity's sake, let's re-queue the "buzz" one: + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .pattern(r#"*\"args\":\[\"buzz\"\]*"#) + .build(), + ) + .await + .unwrap(); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained +} From 48fb985ac05d64308b8c0ec722268f8c1faaec55 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 21 Nov 2024 13:16:46 +0400 Subject: [PATCH 13/23] Add better test isolation --- tests/real/community.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 295df01b..b327b014 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -771,11 +771,18 @@ async fn mutation_requeue_jobs() { // to ensure there are no left-overs let local = "mutation_requeue_jobs"; let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); client.queue_remove(&[local]).await.unwrap(); // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() - .register_fn("rpc_procedure_name", move |_job| async move { + .register_fn(local, move |_job| async move { panic!("Failure should be recorded"); }) .connect() @@ -783,7 +790,7 @@ async fn mutation_requeue_jobs() { .unwrap(); // enqueue a job - let job = JobBuilder::new("rpc_procedure_name").queue(local).build(); + let job = JobBuilder::new(local).queue(local).build(); let job_id = job.id().clone(); client.enqueue(job).await.unwrap(); @@ -823,7 +830,7 @@ async fn mutation_requeue_specific_jobs_only() { client .requeue( MutationTarget::Retries, - MutationFilter::default(), // just an empty filter + MutationFilter::builder().kind(local).build(), ) .await .unwrap(); From ac9da93f1d20f617081b4a9ca14a45b26a7d981b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 22 Nov 2024 10:22:08 +0400 Subject: [PATCH 14/23] Add Client::kill, Client::dicard, Client::clear --- src/proto/client/mutation.rs | 116 ++++++++++++++++++++++++++--------- src/proto/single/mutation.rs | 28 ++++++--- tests/real/community.rs | 5 +- 3 files changed, 107 insertions(+), 42 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 6b2106d2..de2b05f6 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -21,44 +21,100 @@ impl Client { /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); /// # }); /// ``` - // For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 - // The faktory will pull the entire targeted set from Redis to it's memory, iterate over - // each stringified job matching against "id":"...", deserialize the matches into Jobs and - // re-queue those jobs. pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> where F: Borrow>, { + self.mutate(MutationType::Requeue, target, Some(filter.borrow())) + .await + } + + /// Discard the jobs. + /// + /// Will throw the jobs away without any chance for re-scheduling + /// on the server side. If you want to still be able to process the jobs, + /// use [`Client::kill`] instead. + /// + /// E.g. to discard the currently enqueued jobs having "fizz" argument: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let filter = MutationFilter::builder() + /// .pattern(r#"*\"args\":\[\"fizz\"\]*"#) + /// .build(); + /// client.discard(MutationTarget::Scheduled, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn discard<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> + where + F: Borrow>, + { + self.mutate(MutationType::Discard, target, Some(filter.borrow())) + .await + } + + /// Kil the jobs. + /// + /// Moves the jobs from the target structure to the `dead` set, meaning Faktory + /// will not touch it further unless you ask it to do so. You then can, for example, + /// manually process those jobs via the Web UI or send another mutation command + /// targeting [`MutationTarget::Dead`] set. + /// + /// E.g. to kill the currently enqueued jobs with "bill" argument: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let filter = MutationFilter::builder() + /// .pattern(r#"*\"args\":\[\"bill\"\]*"#) + /// .build(); + /// client.kill(MutationTarget::Scheduled, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn kill<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> + where + F: Borrow>, + { + self.mutate(MutationType::Kill, target, Some(filter.borrow())) + .await + } + + /// Purge the targeted structure. + /// + /// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`], + /// but is special cased by Faktory and so is performed faster. Can be though of as + /// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`. + /// + /// E.g. to purged all the jobs that are pending in the `reties` set: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget}; + /// # let mut client = Client::connect().await.unwrap(); + /// client.clear(MutationTarget::Retries).await.unwrap(); + /// # }); + /// ``` + pub async fn clear(&mut self, target: MutationTarget) -> Result<(), Error> { + self.mutate(MutationType::Clear, target, None).await + } + + // For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 + // The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job matching + // looking for a substring "id":"..." or performing regexp search, then deserialize the matches into Jobs and + // perform the action (e.g. requeue). + async fn mutate<'a>( + &mut self, + mtype: MutationType, + mtarget: MutationTarget, + mfilter: Option<&'_ MutationFilter<'_>>, + ) -> Result<(), Error> { self.issue(&MutationAction { - cmd: MutationType::Requeue, - target, - filter: Some(filter.borrow()), + cmd: mtype, + target: mtarget, + filter: mfilter, }) .await? .read_ok() .await } - - /* - From Go bindings: - - // Move the given jobs from structure to the Dead set. - // Faktory will not touch them anymore but you can still see them in the Web UI. - // - // Kill(Retries, OfType("DataSyncJob").WithJids("abc", "123")) - Kill(name Structure, filter JobFilter) error - - // Move the given jobs to their associated queue so they can be immediately - // picked up and processed. - Requeue(name Structure, filter JobFilter) error - - // Throw away the given jobs, e.g. if you want to delete all jobs named "QuickbooksSyncJob" - // - // Discard(Dead, OfType("QuickbooksSyncJob")) - Discard(name Structure, filter JobFilter) error - - // Empty the entire given structure, e.g. if you want to clear all retries. - // This is very fast as it is special cased by Faktory. - Clear(name Structure) error - */ } diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index 10cdefb2..ae043cca 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -1,6 +1,8 @@ +use crate::JobId; use derive_builder::Builder; -use crate::JobId; +#[cfg(doc)] +use crate::Job; /// Mutation target set. #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] @@ -32,23 +34,20 @@ pub enum MutationTarget { /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); /// # }) /// ``` -#[derive(Builder, Clone, Debug, Default, PartialEq, Eq, Serialize)] -#[builder( - default, - setter(into), - build_fn(name = "try_build", private), - pattern = "owned" -)] +#[derive(Builder, Clone, Debug, PartialEq, Eq, Serialize)] +#[builder(setter(into), build_fn(name = "try_build", private), pattern = "owned")] #[non_exhaustive] pub struct MutationFilter<'a> { /// A job's [`kind`](crate::Job::kind). #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "jobtype")] + #[builder(default)] pub kind: Option<&'a str>, /// Ids of jobs to target. #[serde(skip_serializing_if = "Option::is_none")] #[builder(setter(custom))] + #[builder(default)] pub jids: Option<&'a [&'a JobId]>, /// Match attern to use for filtering. @@ -58,6 +57,7 @@ pub struct MutationFilter<'a> { /// for further details. #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "regexp")] + #[builder(default)] pub pattern: Option<&'a str>, } @@ -68,6 +68,18 @@ impl MutationFilter<'_> { } impl<'a> MutationFilter<'_> { + /// Creates an empty filter. + /// + /// Sending a mutation command (e.g. [`Client::discard`]) with an empty + /// filter effectively means perform no filtering at all. + pub fn empty() -> Self { + Self { + kind: None, + jids: None, + pattern: None, + } + } + /// Creates a new builder for a [`MutationFilter`] pub fn builder() -> MutationFilterBuilder<'a> { MutationFilterBuilder::default() diff --git a/tests/real/community.rs b/tests/real/community.rs index b327b014..b16a380e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -914,10 +914,7 @@ async fn mutation_requeue_specific_jobs_only() { // the comination of `kind` + `pattern` (jobtype and regexp in Faktory's terms); // let's first make sure to force-reschedule the jobs: client - .requeue( - MutationTarget::Retries, - MutationFilter::default(), // just an empty filter - ) + .requeue(MutationTarget::Retries, MutationFilter::empty()) .await .unwrap(); assert!(worker.run_one(0, &[local]).await.unwrap()); From 85993d3deacc212b4d6b00ac33004100dd02dc37 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 22 Nov 2024 11:19:49 +0400 Subject: [PATCH 15/23] Test Client::clear --- Makefile | 6 +++ tests/real/community.rs | 87 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 34d11cca..b2a3d318 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,12 @@ test/e2e: cargo test --locked --all-features --all-targets -- \ --nocapture $(pattern) +.PHONY: test/e2e/ignored +test/e2e/ignored: + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \ + cargo test --locked --all-features --all-targets -- \ + --nocapture --include-ignored queue_control_actions_wildcard + .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ diff --git a/tests/real/community.rs b/tests/real/community.rs index b16a380e..116cb49e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,11 +1,13 @@ use crate::{assert_gte, skip_check}; +use chrono::Utc; use faktory::{ Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; use serde_json::Value; -use std::{io, sync, time::Duration}; -use tokio::time as tokio_time; +use std::time::Duration; +use std::{io, sync}; +use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; #[tokio::test(flavor = "multi_thread")] @@ -383,6 +385,26 @@ async fn queue_control_actions_wildcard() { let local_1 = "queue_control_wildcard_1"; let local_2 = "queue_control_wildcard_2"; + // prepare a client and remove any left-overs + // from the previous test run + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_1).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_2).build(), + ) + .await + .unwrap(); + client.queue_remove(&[local_1]).await.unwrap(); + client.queue_remove(&[local_2]).await.unwrap(); + let (tx, rx) = sync::mpsc::channel(); let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); let tx_2 = sync::Arc::clone(&tx_1); @@ -402,8 +424,6 @@ async fn queue_control_actions_wildcard() { .await .unwrap(); - let mut client = Client::connect().await.unwrap(); - // enqueue two jobs on each queue client .enqueue_many([ @@ -453,6 +473,55 @@ async fn queue_control_actions_wildcard() { // our queue are not even mentioned in the server report: assert!(queues.get(local_1).is_none()); assert!(queues.get(local_2).is_none()); + + // let's also test here one bit from the Faktory MUTATION API, + // which affects the entire target set; + // for this, let's enqueue a few jobs that are not supposed to be + // consumed immediately, rather in a few minutes; this they these + // jobs will get into the `scheduled` set + let soon = Utc::now() + Duration::from_secs(2); + client + .enqueue_many([ + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + ]) + .await + .unwrap(); + + // now, let's just clear all the scheduled jobs + client.clear(MutationTarget::Scheduled).await.unwrap(); + + tokio_time::sleep(Duration::from_secs(2)).await; + + // the queue is empty + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + + // even if we force-schedule those jobs + client + .requeue(MutationTarget::Scheduled, MutationFilter::empty()) + .await + .unwrap(); + + // still empty, meaing the jobs have been purged for good + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); } #[tokio::test(flavor = "multi_thread")] @@ -819,6 +888,16 @@ async fn mutation_requeue_jobs() { // TODO: Examine the job's failure (will need a dedicated PR) } +#[tokio::test(flavor = "multi_thread")] +async fn mutation_kill_vs_discard() { + // Plan: + // 1. Create and push a few jobs with Job::at(..) populated + // 2. Kill them: `scheduled` -> `dead` + // 3. Re-enqueue them: `dead` -> `enqueued` + // 4. Consume-fail them: `enqueued` -> `retries` + // 5. Discard them: `retries` -> void +} + #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_specific_jobs_only() { skip_check!(); From 54eecc3284102b408e43d881c23e5aa457c7520d Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 22 Nov 2024 12:35:06 +0400 Subject: [PATCH 16/23] Fix minimal version job --- tests/real/community.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index 116cb49e..265ef75a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -479,7 +479,7 @@ async fn queue_control_actions_wildcard() { // for this, let's enqueue a few jobs that are not supposed to be // consumed immediately, rather in a few minutes; this they these // jobs will get into the `scheduled` set - let soon = Utc::now() + Duration::from_secs(2); + let soon = Utc::now() + chrono::Duration::seconds(2); client .enqueue_many([ Job::builder(local_1) From a883f57d7b6ebbf30ba76eae197af856711e5fa8 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 23 Nov 2024 12:06:03 +0400 Subject: [PATCH 17/23] Add docs and conveniences like Client::requeue_by_ids --- src/proto/client/mutation.rs | 66 ++++++++++++++++++++++++++++++++++-- src/proto/single/mutation.rs | 14 ++++++-- 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index de2b05f6..00a2ad20 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -1,13 +1,16 @@ use crate::{ proto::single::{MutationAction, MutationType}, - Client, Error, MutationFilter, MutationTarget, + Client, Error, JobId, MutationFilter, MutationTarget, }; use std::borrow::Borrow; impl Client { /// Re-enqueue the jobs. /// - /// This will immediately move the jobs from the targeted set (see [`MutationTarget`]) + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// This method will immediately move the jobs from the targeted set (see [`MutationTarget`]) /// to their queues. This will apply to the jobs satisfying the [`filter`](crate::MutationFilter). /// /// ```no_run @@ -29,8 +32,28 @@ impl Client { .await } + /// Re-enqueue the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`]) + /// with the gived `jids` for you. + pub async fn requeue_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Requeue, target, Some(&filter)) + .await + } + /// Discard the jobs. /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// /// Will throw the jobs away without any chance for re-scheduling /// on the server side. If you want to still be able to process the jobs, /// use [`Client::kill`] instead. @@ -54,8 +77,28 @@ impl Client { .await } + /// Discard the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`]) + /// with the gived `jids` for you. + pub async fn discard_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Discard, target, Some(&filter)) + .await + } + /// Kil the jobs. /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// /// Moves the jobs from the target structure to the `dead` set, meaning Faktory /// will not touch it further unless you ask it to do so. You then can, for example, /// manually process those jobs via the Web UI or send another mutation command @@ -80,8 +123,27 @@ impl Client { .await } + /// Kill the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`]) + /// with the gived `jids` for you. + pub async fn kill_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Kill, target, Some(&filter)).await + } + /// Purge the targeted structure. /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// /// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`], /// but is special cased by Faktory and so is performed faster. Can be though of as /// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`. diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index ae043cca..52091548 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -2,7 +2,7 @@ use crate::JobId; use derive_builder::Builder; #[cfg(doc)] -use crate::Job; +use crate::{Client, Job}; /// Mutation target set. #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] @@ -20,7 +20,17 @@ pub enum MutationTarget { Dead, } -/// Filter to help narrow down the mutation target. +// As of Faktory v1.9.2, not all the fields on the filter +// will be taken into account, rather EITHER `jids` OR optional `kind` +// plus optional `pattern`. +// See: https://github.com/contribsys/faktory/issues/489 +// +/// A filter to help narrow down the mutation target. +/// +/// As of Faktory version 1.9.2, if [`MutationFilter::pattern`] and/or [`MutationFilter::kind`] +/// is specified, the values in [`MutationFilter::jids`] will not be taken into account by the +/// server. If you want to filter by `jids`, make sure to leave other fields of the filter empty +/// or use dedicated methods like [`Client::requeue_by_ids`]. /// /// Example usage: /// ```no_run From 17cdb4cd8d2631a70faf86f93bc737aa8a2508e5 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 23 Nov 2024 12:27:49 +0400 Subject: [PATCH 18/23] Add test for Client::kill plus Client::requeue --- src/proto/client/mutation.rs | 12 +++--- tests/real/community.rs | 83 +++++++++++++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 00a2ad20..04b1ca1a 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -33,10 +33,10 @@ impl Client { } /// Re-enqueue the jobs with the given ids. - /// + /// /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, /// you will want to use it for administration purposes only. - /// + /// /// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`]) /// with the gived `jids` for you. pub async fn requeue_by_ids<'a>( @@ -78,10 +78,10 @@ impl Client { } /// Discard the jobs with the given ids. - /// + /// /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, /// you will want to use it for administration purposes only. - /// + /// /// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`]) /// with the gived `jids` for you. pub async fn discard_by_ids<'a>( @@ -124,10 +124,10 @@ impl Client { } /// Kill the jobs with the given ids. - /// + /// /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, /// you will want to use it for administration purposes only. - /// + /// /// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`]) /// with the gived `jids` for you. pub async fn kill_by_ids<'a>( diff --git a/tests/real/community.rs b/tests/real/community.rs index 265ef75a..f365d635 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -889,13 +889,82 @@ async fn mutation_requeue_jobs() { } #[tokio::test(flavor = "multi_thread")] -async fn mutation_kill_vs_discard() { - // Plan: - // 1. Create and push a few jobs with Job::at(..) populated - // 2. Kill them: `scheduled` -> `dead` - // 3. Re-enqueue them: `dead` -> `enqueued` - // 4. Consume-fail them: `enqueued` -> `retries` - // 5. Discard them: `retries` -> void +async fn mutation_kill_and_requeue() { + skip_check!(); + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "mutation_kill_vs_discard"; + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // enqueue a couple of jobs and ... + client.queue_remove(&[local]).await.unwrap(); + let soon = Utc::now() + chrono::Duration::seconds(2); + client + .enqueue_many([ + Job::builder(local) + .args(vec![Value::from(1)]) + .queue(local) + .at(soon) + .build(), + Job::builder(local) + .args(vec![Value::from(2)]) + .queue(local) + .at(soon) + .build(), + ]) + .await + .unwrap(); + + // kill them ... + client + .kill( + MutationTarget::Scheduled, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // the two jobs were moved from `scheduled` to `dead`, + // and so the queue is empty + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); + + // let's now enqueue those jobs + client + .requeue( + MutationTarget::Dead, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // they transitioned from `dead` to being enqueued + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 2); } #[tokio::test(flavor = "multi_thread")] From 22a4d1816d727cc1ddb7951a741f987792a6c5ca Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 23 Nov 2024 12:35:35 +0400 Subject: [PATCH 19/23] Test Client::discard --- tests/real/community.rs | 74 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index f365d635..b5646db0 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -889,7 +889,7 @@ async fn mutation_requeue_jobs() { } #[tokio::test(flavor = "multi_thread")] -async fn mutation_kill_and_requeue() { +async fn mutation_kill_and_requeue_and_discard() { skip_check!(); // prepare a client and clean up the queue @@ -965,6 +965,76 @@ async fn mutation_kill_and_requeue() { .map(|v| *v) .unwrap_or_default(); assert_eq!(njobs, 2); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic!("force fail this job"); + }) + .connect() + .await + .unwrap(); + + // cosume them (and immediately fail them) and make + // sure the queue is drained + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); // sanity check + + // so the jobs have transitioned from being enqueued + // to the `retries` set, and we can now completely discard them + client + .discard( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Double-check + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Dead, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Scheduled, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Gone for good + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); } #[tokio::test(flavor = "multi_thread")] @@ -987,7 +1057,7 @@ async fn mutation_requeue_specific_jobs_only() { // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() .register_fn(local, move |_job| async move { - panic!("Force fail this job"); + panic!("force fail this job"); }) .connect() .await From cded093e15c2144a805aaca7e4e3b9db5a14a0a0 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 23 Nov 2024 13:16:44 +0400 Subject: [PATCH 20/23] Fix typos --- src/proto/client/mutation.rs | 12 ++++++------ src/proto/single/mutation.rs | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 04b1ca1a..580412da 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -38,7 +38,7 @@ impl Client { /// you will want to use it for administration purposes only. /// /// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`]) - /// with the gived `jids` for you. + /// with the given `jids` for you. pub async fn requeue_by_ids<'a>( &mut self, target: MutationTarget, @@ -83,7 +83,7 @@ impl Client { /// you will want to use it for administration purposes only. /// /// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`]) - /// with the gived `jids` for you. + /// with the given `jids` for you. pub async fn discard_by_ids<'a>( &mut self, target: MutationTarget, @@ -129,7 +129,7 @@ impl Client { /// you will want to use it for administration purposes only. /// /// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`]) - /// with the gived `jids` for you. + /// with the given `jids` for you. pub async fn kill_by_ids<'a>( &mut self, target: MutationTarget, @@ -145,10 +145,10 @@ impl Client { /// you will want to use it for administration purposes only. /// /// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`], - /// but is special cased by Faktory and so is performed faster. Can be though of as + /// but is special cased by Faktory and so is performed faster. Can be thought of as /// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`. /// - /// E.g. to purged all the jobs that are pending in the `reties` set: + /// E.g. to purge all the jobs that are pending in the `reties` set: /// ```no_run /// # tokio_test::block_on(async { /// # use faktory::{Client, MutationTarget}; @@ -161,7 +161,7 @@ impl Client { } // For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 - // The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job matching + // The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job // looking for a substring "id":"..." or performing regexp search, then deserialize the matches into Jobs and // perform the action (e.g. requeue). async fn mutate<'a>( diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index 52091548..3caac0fe 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -60,7 +60,7 @@ pub struct MutationFilter<'a> { #[builder(default)] pub jids: Option<&'a [&'a JobId]>, - /// Match attern to use for filtering. + /// Match pattern to use for filtering. /// /// Faktory will pass this directly to Redis's `SCAN` command, /// so please see the [`SCAN` documentation](https://redis.io/docs/latest/commands/scan/) @@ -81,7 +81,7 @@ impl<'a> MutationFilter<'_> { /// Creates an empty filter. /// /// Sending a mutation command (e.g. [`Client::discard`]) with an empty - /// filter effectively means perform no filtering at all. + /// filter effectively means performing no filtering at all. pub fn empty() -> Self { Self { kind: None, @@ -90,7 +90,7 @@ impl<'a> MutationFilter<'_> { } } - /// Creates a new builder for a [`MutationFilter`] + /// Creates a new builder for a [`MutationFilter`]. pub fn builder() -> MutationFilterBuilder<'a> { MutationFilterBuilder::default() } From 7d9a94c5a23e9a821dacca35c069c105a263d777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= Date: Tue, 31 Dec 2024 15:01:47 +0400 Subject: [PATCH 21/23] Update src/proto/client/mutation.rs Co-authored-by: Jon Gjengset --- src/proto/client/mutation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs index 580412da..82a9949e 100644 --- a/src/proto/client/mutation.rs +++ b/src/proto/client/mutation.rs @@ -94,7 +94,7 @@ impl Client { .await } - /// Kil the jobs. + /// Kill a set of jobs. /// /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, /// you will want to use it for administration purposes only. From 8346f08427289aaaf40bbb5e390f8c7beb5e3327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= Date: Tue, 31 Dec 2024 15:04:24 +0400 Subject: [PATCH 22/23] Update src/proto/single/mutation.rs Co-authored-by: Jon Gjengset --- src/proto/single/mutation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs index 3caac0fe..da0ebdc9 100644 --- a/src/proto/single/mutation.rs +++ b/src/proto/single/mutation.rs @@ -54,7 +54,7 @@ pub struct MutationFilter<'a> { #[builder(default)] pub kind: Option<&'a str>, - /// Ids of jobs to target. + /// [`JobId`]s to target. #[serde(skip_serializing_if = "Option::is_none")] #[builder(setter(custom))] #[builder(default)] From 206e5f7de21de49bd22abcaf7510e6faaf909bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= Date: Tue, 31 Dec 2024 17:52:40 +0400 Subject: [PATCH 23/23] Job Failure (#89) * Make Failure public * Update changelog * Get more info from panicking * Check different ways a job can fail * Add test clean up * Account for re-queued jobs * Check job id * Collect all kinds of handler failure * Add Job::failure_message. Check all error messages * Add to CHANGELOG.md. Fix clippy warning * Update tests/real/community.rs Co-authored-by: Jon Gjengset * Update src/proto/single/mod.rs Co-authored-by: Jon Gjengset * Fix docs to Failure * Nuke Job::failure_message * Nuke Job::failure_message * Restore comment in test_panic_and_errors_in_handler test * Add to the tests docs * Do not make Failure::kind pub --------- Co-authored-by: Jon Gjengset --- CHANGELOG.md | 2 + src/lib.rs | 4 +- src/proto/mod.rs | 4 +- src/proto/single/mod.rs | 46 ++++++-- src/worker/mod.rs | 17 ++- tests/real/community.rs | 245 +++++++++++++++++++++++++++++++++++----- 6 files changed, 275 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 862ab583..a43b567f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support Faktory's `MUTATE` API ([#87]) +- Make `Failure` struct public ([#89]) ### Changed @@ -22,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security [#87]: https://github.com/jonhoo/faktory-rs/pull/87 +[#89]: https://github.com/jonhoo/faktory-rs/pull/89 ## [0.13.0] - 2024-10-27 diff --git a/src/lib.rs b/src/lib.rs index 02a88aec..5fc30fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, - MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, + MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 24fe7533..7f16a061 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,8 +12,8 @@ pub use client::{Client, Connection}; mod single; pub use single::{ - DataSnapshot, FaktoryState, Job, JobBuilder, JobId, MutationFilter, MutationFilterBuilder, - MutationTarget, ServerSnapshot, WorkerId, + DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId, }; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 2cbe4087..8cd13ebf 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -124,6 +124,8 @@ pub struct Job { /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + // TODO: this should probably be a usize, see Failure::retry_count + // TODO: and Failure::retry_remaining. This can go to 0.14 release pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -206,19 +208,45 @@ impl JobBuilder { } } +/// Details on a job's failure. #[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] pub struct Failure { - retry_count: usize, - failed_at: String, - #[serde(skip_serializing_if = "Option::is_none")] - next_at: Option, + /// Number of times this job has been retried. + pub retry_count: usize, + + /// Number of remaining retry attempts. + /// + /// This is the difference between how many times this job + /// _can_ be retried (see [`Job::retry`]) and the number of retry + /// attempts that have already been made (see [`Failure::retry_count`]). + #[serde(rename = "remaining")] + pub retry_remaining: usize, + + /// Last time this job failed. + pub failed_at: DateTime, + + /// When this job will be retried. + /// + /// This will be `None` if there are no retry + /// attempts (see [`Failure::retry_remaining`]) left. #[serde(skip_serializing_if = "Option::is_none")] - message: Option, + pub next_at: Option>, + + /// Error message, if any. #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + + // This is Some("unknown") most of the time, and we are not making + // it public for now, see discussion: + // https://github.com/jonhoo/faktory-rs/pull/89#discussion_r1899423130 + /// Error kind, if known. #[serde(rename = "errtype")] - kind: Option, + pub(crate) kind: Option, + + /// Stack trace from last failure, if any. #[serde(skip_serializing_if = "Option::is_none")] - backtrace: Option>, + pub backtrace: Option>, } impl Job { @@ -263,8 +291,8 @@ impl Job { } /// Data about this job's most recent failure. - pub fn failure(&self) -> &Option { - &self.failure + pub fn failure(&self) -> Option<&Failure> { + self.failure.as_ref() } } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 2b6aab89..3547d438 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -360,7 +360,22 @@ impl Worker { let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), - Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => { + if e.is_cancelled() { + Fail::generic(jid, "job processing was cancelled") + } else if e.is_panic() { + let panic_obj = e.into_panic(); + if panic_obj.is::() { + Fail::generic(jid, *panic_obj.downcast::().unwrap()) + } else if panic_obj.is::<&'static str>() { + Fail::generic(jid, *panic_obj.downcast::<&'static str>().unwrap()) + } else { + Fail::generic(jid, "job processing panicked") + } + } else { + Fail::generic_with_backtrace(jid, e) + } + } }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index b5646db0..9035749a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,12 +1,17 @@ -use crate::{assert_gte, skip_check}; +use crate::{assert_gt, assert_gte, assert_lt, skip_check}; use chrono::Utc; use faktory::{ - Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, + Client, Job, JobBuilder, JobId, JobRunner, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; +use rand::Rng; use serde_json::Value; +use std::collections::HashMap; +use std::panic::panic_any; +use std::sync::Arc; use std::time::Duration; use std::{io, sync}; +use tokio::sync::mpsc::error::SendError; use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; @@ -678,7 +683,7 @@ async fn test_jobs_created_with_builder() { use std::future::Future; use std::pin::Pin; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; fn process_hard_task( sender: sync::Arc>, ) -> Box< @@ -795,46 +800,195 @@ async fn test_jobs_with_blocking_handlers() { } #[tokio::test(flavor = "multi_thread")] -async fn test_panic_in_handler() { +async fn test_panic_and_errors_in_handler() { skip_check!(); - let local = "test_panic_in_handler"; + let job_kind_vs_error_msg = HashMap::from([ + ("panic_SYNC_handler_str", "panic_SYNC_handler_str"), + ("panic_SYNC_handler_String", "panic_SYNC_handler_String"), + ("panic_SYNC_handler_int", "job processing panicked"), + ("error_from_SYNC_handler", "error_from_SYNC_handler"), + ("panic_ASYNC_handler_str", "panic_ASYNC_handler_str"), + ("panic_ASYNC_handler_String", "panic_ASYNC_handler_String"), + ("panic_ASYNC_handler_int", "job processing panicked"), + ("error_from_ASYNC_handler", "error_from_ASYNC_handler"), + ( + "no_handler_registered_for_this_jobtype_initially", + "No handler for no_handler_registered_for_this_jobtype_initially", + ), + ]); + let njobs = job_kind_vs_error_msg.keys().len(); + + // clean up is needed when re-using the same Faktory container, since the + // Faktory server could have re-scheduled (or might be doing it right now) + // the failed jobs from the previous test run; to keep things clean, we are + // force-rescheduling and immediatey dropping any remainders + let local = "test_panic_and_errors_in_handler"; + let mut c = Client::connect().await.unwrap(); + let pattern = format!(r#"*\"args\":\[\"{}\"\]*"#, local); + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + c.queue_remove(&[local]).await.unwrap(); let mut w = Worker::builder::() - .register_blocking_fn("panic_SYNC_handler", |_j| { - panic!("Panic inside sync the handler..."); + // sync handlers + .register_blocking_fn("panic_SYNC_handler_str", |_j| { + panic!("panic_SYNC_handler_str"); + }) + .register_blocking_fn("panic_SYNC_handler_String", |_j| { + panic_any("panic_SYNC_handler_String".to_string()); + }) + .register_blocking_fn("panic_SYNC_handler_int", |_j| { + panic_any(0); + }) + .register_blocking_fn("error_from_SYNC_handler", |_j| { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_SYNC_handler", + )) + }) + // async handlers + .register_fn("panic_ASYNC_handler_str", |_j| async move { + panic!("panic_ASYNC_handler_str"); + }) + .register_fn("panic_ASYNC_handler_String", |_j| async move { + panic_any("panic_ASYNC_handler_String".to_string()); + }) + .register_fn("panic_ASYNC_handler_int", |_j| async move { + panic_any(0); }) - .register_fn("panic_ASYNC_handler", |_j| async move { - panic!("Panic inside async handler..."); + .register_fn("error_from_ASYNC_handler", |_j| async move { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_ASYNC_handler", + )) }) .connect() .await .unwrap(); - let mut c = Client::connect().await.unwrap(); - - c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) - .await - .unwrap(); - + // let's enqueue jobs first time and ... + c.enqueue_many( + job_kind_vs_error_msg + .keys() + .map(|&jkind| Job::builder(jkind).queue(local).args([local]).build()) + .collect::>(), + ) + .await + .unwrap(); + + // ... consume all the jobs from the queue and _fail_ them + // "in different ways" (see our worker setup above); + // // we _did_ consume and process the job, the processing result itself though // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command; - // note how the test run is not interrupted with a panic - assert!(w.run_one(0, &[local]).await.unwrap()); + // to the Faktory server via the FAIL command, the error message and the backtrace, if any, + // will then be available to a Web UI user or to the worker consumes the retried job + // (if `Job::retry` is Some and > 0 and there are `Failure::retry_remaining` attempts left) + // in this job's `failure` field; see how we are consuming the retried jobs later + // in this test and examin the failure details; + // + // also note how the test run is not interrupted here with a panic + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); + } + + // let's now make sure all the jobs are re-enqueued + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + + // now, let's create a worker who will only send jobs to the + // test's main thread to make some assertions; + struct JobHandler { + chan: Arc>, + } + impl JobHandler { + pub fn new(chan: Arc>) -> Self { + Self { chan } + } + } + #[async_trait::async_trait] + impl JobRunner for JobHandler { + type Error = SendError; + async fn run(&self, job: Job) -> Result<(), Self::Error> { + self.chan.send(job).await + } + } + let (tx, mut rx) = tokio::sync::mpsc::channel(njobs); + let tx = sync::Arc::new(tx); - c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) + // unlike the previus worker, this one is not failing the jobs, + // it is rather helping us to inspect them + let mut w = Worker::builder() + .register("panic_SYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_SYNC_handler", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_ASYNC_handler", JobHandler::new(tx.clone())) + .register( + "no_handler_registered_for_this_jobtype_initially", + JobHandler::new(tx.clone()), + ) + .connect() .await .unwrap(); - // same for async handler, note how the test run is not interrupted with a panic - assert!(!w.is_terminated()); - assert!(w.run_one(0, &[local]).await.unwrap()); + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); // reminder: we've requeued the failed jobs + } + + // Let's await till the worker sends the jobs to us. + // + // Note that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job + // via the channel and so `rx.recv_many` will just hang (and so the entire test run), + // hence a timeout we are adding here. + // + // (1)If you are curious, this can be easily reproduced inside the `Callback::Async(_)` arm + // of `Worker::run_job`, by swapping this line: + // ```rust + // spawn(processing_task).await + // ``` + // for something like: + // ```rust + // let handle = spawn(processing_task); + // handle.abort(); + // handle.await + // ``` + // and then running this test. + let mut jobs: Vec = Vec::with_capacity(njobs); + let nreceived = + tokio::time::timeout(Duration::from_secs(5), rx.recv_many(jobs.as_mut(), njobs)) + .await + .expect("all jobs to be recieved within the timeout period"); + + // all the jobs are now in the test's main thread + assert_eq!(nreceived, njobs); + + // let's verify that errors messages in each job's `Failure` are as expected + for job in jobs { + let error_message_got = job.failure().as_ref().unwrap().message.as_ref().unwrap(); + let error_message_expected = *job_kind_vs_error_msg.get(job.kind()).unwrap(); + assert_eq!(error_message_got, error_message_expected); + } } #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_jobs() { skip_check!(); + let test_started_at = Utc::now(); + let max_retries = rand::thread_rng().gen_range(2..25); + let panic_message = "Failure should be recorded"; // prepare a client and clean up the queue // to ensure there are no left-overs @@ -852,14 +1006,17 @@ async fn mutation_requeue_jobs() { // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() .register_fn(local, move |_job| async move { - panic!("Failure should be recorded"); + panic_any(panic_message); }) .connect() .await .unwrap(); // enqueue a job - let job = JobBuilder::new(local).queue(local).build(); + let job = JobBuilder::new(local) + .queue(local) + .retry(max_retries) + .build(); let job_id = job.id().clone(); client.enqueue(job).await.unwrap(); @@ -872,7 +1029,7 @@ async fn mutation_requeue_jobs() { let had_one = worker.run_one(0, &[local]).await.unwrap(); assert!(!had_one); - // ... we can force it + // ... we can force it, so let's requeue the job and ... client .requeue( MutationTarget::Retries, @@ -881,11 +1038,41 @@ async fn mutation_requeue_jobs() { .await .unwrap(); - // the job has been re-enqueued and we consumed it again - let had_one = worker.run_one(0, &[local]).await.unwrap(); - assert!(had_one); + // ... this time, instead of failing the job this time, let's + // create a new woker that will just send the job + // to the test thread so that we can inspect and + // assert on the failure from the first run + let (tx, rx) = sync::mpsc::channel(); + let tx = sync::Arc::new(sync::Mutex::new(tx)); + let mut w = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local)) + .register_fn(local, move |j| { + let tx = sync::Arc::clone(&tx); + Box::pin(async move { + tx.lock().unwrap().send(j).unwrap(); + Ok::<(), io::Error>(()) + }) + }) + .connect() + .await + .unwrap(); + assert!(w.run_one(0, &[local]).await.unwrap()); + let job = rx.recv().unwrap(); - // TODO: Examine the job's failure (will need a dedicated PR) + assert_eq!(job.id(), &job_id); // sanity check + + let failure_info = job.failure().unwrap(); + assert_eq!(failure_info.retry_count, 0); + assert_eq!( + failure_info.retry_remaining, + max_retries as usize - failure_info.retry_count + ); + assert_lt!(failure_info.failed_at, Utc::now()); + assert_gt!(failure_info.failed_at, test_started_at); + assert!(failure_info.next_at.is_some()); + assert_eq!(failure_info.message.as_ref().unwrap(), panic_message); + assert!(failure_info.backtrace.is_none()); } #[tokio::test(flavor = "multi_thread")]