diff --git a/CHANGELOG.md b/CHANGELOG.md index 49a168f9..a43b567f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support Faktory's `MUTATE` API ([#87]) +- Make `Failure` struct public ([#89]) + ### Changed ### Deprecated @@ -19,16 +22,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +[#87]: https://github.com/jonhoo/faktory-rs/pull/87 +[#89]: https://github.com/jonhoo/faktory-rs/pull/89 + ## [0.13.0] - 2024-10-27 ### Added -- `Error::Stream` for underlying 'native_tls' and 'rustls' errors ([#49]) +- rustls, native_tls: `Error::Stream` for underlying `native_tls` and `rustls` errors ([#49]) - Shutdown signal via `WorkerBuilder::with_graceful_shutdown` ([#57]) - Shutdown timeout via `WorkerBuilder::shutdown_timeout` ([#57]) - `Client` method for pausing, resuming, and removing queues ([#59]) - `Client::current_info` and `FaktoryState` struct ([#63]) -- TLS configurations options to `WorkerBuilder` ([#74]) +- rustls, native_tls: TLS configurations options to `WorkerBuilder` ([#74]) ### Changed @@ -57,13 +63,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `JobRunner` trait and `ConsumerBuilder::register_runner` ([#51]) - Support for enqueuing numerous jobs with `Producer::enqueue_many` ([#54]) -- Faktory Enterprise Edition: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48]) -- Faktory Enterprise Edition: Setting and getting a job's progress ([#48]) +- ent: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48]) +- ent: Setting and getting a job's progress ([#48]) [#48]: https://github.com/jonhoo/faktory-rs/pull/48 [#51]: https://github.com/jonhoo/faktory-rs/pull/51 [#54]: https://github.com/jonhoo/faktory-rs/pull/54 - [unreleased]: https://github.com/jonhoo/faktory-rs/compare/v0.13.0...HEAD [0.13.0]: https://github.com/jonhoo/faktory-rs/compare/v0.12.5...v0.13.0 [0.12.5]: https://github.com/jonhoo/faktory-rs/compare/v0.12.4...v0.12.5 diff --git a/Cargo.lock b/Cargo.lock index 83399ae6..c4677a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,7 +434,7 @@ dependencies = [ [[package]] name = "faktory" -version = "0.13.0" +version = "0.13.1-rc0" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 0027c75b..ac229f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "faktory" -version = "0.13.0" +version = "0.13.1-rc0" authors = ["Jon Gjengset "] edition = "2021" license = "MIT OR Apache-2.0" diff --git a/Makefile b/Makefile index c5eee45d..b2a3d318 100644 --- a/Makefile +++ b/Makefile @@ -54,17 +54,26 @@ test/doc: .PHONY: test/e2e test/e2e: - FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} cargo test --locked --all-features --all-targets + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \ + cargo test --locked --all-features --all-targets -- \ + --nocapture $(pattern) + +.PHONY: test/e2e/ignored +test/e2e/ignored: + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \ + cargo test --locked --all-features --all-targets -- \ + --nocapture --include-ignored queue_control_actions_wildcard .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ FAKTORY_URL=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT} \ - cargo test --locked --features native_tls,rustls --test tls -- --nocapture + cargo test --locked --features native_tls,rustls --test tls -- \ + --nocapture $(pattern) .PHONY: test/load test/load: - cargo run --release --features binaries + cargo run --release --features binaries $(jobs) $(threads) .PHONY: test/perf test/perf: diff --git a/src/lib.rs b/src/lib.rs index 28d8887a..5fc30fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,8 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, - ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, + MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 95e44c9b..e1bb4ba1 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -21,6 +21,8 @@ mod conn; pub(crate) use conn::BoxedConnection; pub use conn::Connection; +mod mutation; + pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; fn check_protocols_match(ver: usize) -> Result<(), Error> { diff --git a/src/proto/client/mutation.rs b/src/proto/client/mutation.rs new file mode 100644 index 00000000..82a9949e --- /dev/null +++ b/src/proto/client/mutation.rs @@ -0,0 +1,182 @@ +use crate::{ + proto::single::{MutationAction, MutationType}, + Client, Error, JobId, MutationFilter, MutationTarget, +}; +use std::borrow::Borrow; + +impl Client { + /// Re-enqueue the jobs. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// This method will immediately move the jobs from the targeted set (see [`MutationTarget`]) + /// to their queues. This will apply to the jobs satisfying the [`filter`](crate::MutationFilter). + /// + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{JobId, Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let job_id1 = JobId::new("3sgE_qwtqw1501"); + /// let job_id2 = JobId::new("3sgE_qwtqw1502"); + /// let failed_ids = [&job_id1, &job_id2]; + /// let filter = MutationFilter::builder().jids(failed_ids.as_slice()).build(); + /// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> + where + F: Borrow>, + { + self.mutate(MutationType::Requeue, target, Some(filter.borrow())) + .await + } + + /// Re-enqueue the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`]) + /// with the given `jids` for you. + pub async fn requeue_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Requeue, target, Some(&filter)) + .await + } + + /// Discard the jobs. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Will throw the jobs away without any chance for re-scheduling + /// on the server side. If you want to still be able to process the jobs, + /// use [`Client::kill`] instead. + /// + /// E.g. to discard the currently enqueued jobs having "fizz" argument: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let filter = MutationFilter::builder() + /// .pattern(r#"*\"args\":\[\"fizz\"\]*"#) + /// .build(); + /// client.discard(MutationTarget::Scheduled, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn discard<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> + where + F: Borrow>, + { + self.mutate(MutationType::Discard, target, Some(filter.borrow())) + .await + } + + /// Discard the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`]) + /// with the given `jids` for you. + pub async fn discard_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Discard, target, Some(&filter)) + .await + } + + /// Kill a set of jobs. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Moves the jobs from the target structure to the `dead` set, meaning Faktory + /// will not touch it further unless you ask it to do so. You then can, for example, + /// manually process those jobs via the Web UI or send another mutation command + /// targeting [`MutationTarget::Dead`] set. + /// + /// E.g. to kill the currently enqueued jobs with "bill" argument: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget, MutationFilter}; + /// # let mut client = Client::connect().await.unwrap(); + /// let filter = MutationFilter::builder() + /// .pattern(r#"*\"args\":\[\"bill\"\]*"#) + /// .build(); + /// client.kill(MutationTarget::Scheduled, &filter).await.unwrap(); + /// # }); + /// ``` + pub async fn kill<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> + where + F: Borrow>, + { + self.mutate(MutationType::Kill, target, Some(filter.borrow())) + .await + } + + /// Kill the jobs with the given ids. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`]) + /// with the given `jids` for you. + pub async fn kill_by_ids<'a>( + &mut self, + target: MutationTarget, + jids: &'_ [&'_ JobId], + ) -> Result<(), Error> { + let filter = MutationFilter::builder().jids(jids).build(); + self.mutate(MutationType::Kill, target, Some(&filter)).await + } + + /// Purge the targeted structure. + /// + /// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, + /// you will want to use it for administration purposes only. + /// + /// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`], + /// but is special cased by Faktory and so is performed faster. Can be thought of as + /// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`. + /// + /// E.g. to purge all the jobs that are pending in the `reties` set: + /// ```no_run + /// # tokio_test::block_on(async { + /// # use faktory::{Client, MutationTarget}; + /// # let mut client = Client::connect().await.unwrap(); + /// client.clear(MutationTarget::Retries).await.unwrap(); + /// # }); + /// ``` + pub async fn clear(&mut self, target: MutationTarget) -> Result<(), Error> { + self.mutate(MutationType::Clear, target, None).await + } + + // For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 + // The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job + // looking for a substring "id":"..." or performing regexp search, then deserialize the matches into Jobs and + // perform the action (e.g. requeue). + async fn mutate<'a>( + &mut self, + mtype: MutationType, + mtarget: MutationTarget, + mfilter: Option<&'_ MutationFilter<'_>>, + ) -> Result<(), Error> { + self.issue(&MutationAction { + cmd: mtype, + target: mtarget, + filter: mfilter, + }) + .await? + .read_ok() + .await + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a0304ccb..7f16a061 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,10 @@ pub use client::{Client, Connection}; mod single; -pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId}; +pub use single::{ + DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter, + MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId, +}; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 13efb0ab..1a983eda 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -3,12 +3,24 @@ use crate::proto::{Job, JobId, WorkerId}; use std::error::Error as StdError; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use super::{MutationFilter, MutationTarget}; + #[async_trait::async_trait] pub trait FaktoryCommand { async fn issue(&self, w: &mut W) -> Result<(), Error>; } macro_rules! self_to_cmd { + ($struct:ident) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + let cmd = stringify!($struct).to_uppercase(); + w.write_all(cmd.as_bytes()).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; ($struct:ident, $cmd:expr) => { #[async_trait::async_trait] impl FaktoryCommand for $struct { @@ -21,6 +33,30 @@ macro_rules! self_to_cmd { } } }; + ($struct:ident<$lt:lifetime>, $cmd:expr) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct<$lt> { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all($cmd.as_bytes()).await?; + w.write_all(b" ").await?; + let r = serde_json::to_vec(self).map_err(Error::Serialization)?; + w.write_all(&r).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; + ($struct:ident, $cmd:expr, $field:tt) => { + #[async_trait::async_trait] + impl FaktoryCommand for $struct { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all($cmd.as_bytes()).await?; + w.write_all(b" ").await?; + let r = serde_json::to_vec(&self.$field).map_err(Error::Serialization)?; + w.write_all(&r).await?; + Ok(w.write_all(b"\r\n").await?) + } + } + }; } /// Write queues as part of a command. They are written with a leading space @@ -42,12 +78,7 @@ where pub(crate) struct Info; -#[async_trait::async_trait] -impl FaktoryCommand for Info { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - Ok(w.write_all(b"INFO\r\n").await?) - } -} +self_to_cmd!(Info); // -------------------- ACK ---------------------- @@ -138,12 +169,7 @@ self_to_cmd!(Fail, "FAIL"); pub(crate) struct End; -#[async_trait::async_trait] -impl FaktoryCommand for End { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - Ok(w.write_all(b"END\r\n").await?) - } -} +self_to_cmd!(End); // --------------------- FETCH -------------------- @@ -230,29 +256,13 @@ self_to_cmd!(Hello, "HELLO"); pub(crate) struct Push(Job); -use std::ops::Deref; -impl Deref for Push { - type Target = Job; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl From for Push { fn from(j: Job) -> Self { Push(j) } } -#[async_trait::async_trait] -impl FaktoryCommand for Push { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - w.write_all(b"PUSH ").await?; - let r = serde_json::to_vec(&**self).map_err(Error::Serialization)?; - w.write_all(&r).await?; - Ok(w.write_all(b"\r\n").await?) - } -} +self_to_cmd!(Push, "PUSH", 0); // ---------------------- PUSHB ------------------- @@ -264,15 +274,7 @@ impl From> for PushBulk { } } -#[async_trait::async_trait] -impl FaktoryCommand for PushBulk { - async fn issue(&self, w: &mut W) -> Result<(), Error> { - w.write_all(b"PUSHB ").await?; - let r = serde_json::to_vec(&self.0).map_err(Error::Serialization)?; - w.write_all(&r).await?; - Ok(w.write_all(b"\r\n").await?) - } -} +self_to_cmd!(PushBulk, "PUSHB", 0); // ---------------------- QUEUE ------------------- @@ -311,3 +313,39 @@ impl<'a, S: AsRef> QueueControl<'a, S> { Self { action, queues } } } + +// ---------------------- MUTATE ------------------- + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +#[non_exhaustive] +pub(crate) enum MutationType { + #[default] + Kill, + Requeue, + Discard, + Clear, +} + +fn filter_is_empty(f: &Option<&MutationFilter<'_>>) -> bool { + // Rust 1.82 has got the genious `Option::is_none_or`, + // and `Option::is_some_and` which will allow us to: + // ```rust + // f.is_none_or(|f| f.is_empty()) + // ``` + // As of crate version `0.13.1-rc0`, we've got `1.70` as MSRV. + match f { + None => true, + Some(f) => f.is_empty(), + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub(crate) struct MutationAction<'a> { + pub(crate) cmd: MutationType, + pub(crate) target: MutationTarget, + #[serde(skip_serializing_if = "filter_is_empty")] + pub(crate) filter: Option<&'a MutationFilter<'a>>, +} + +self_to_cmd!(MutationAction<'_>, "MUTATE"); diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 031b08cd..8cd13ebf 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -7,11 +7,13 @@ use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt}; mod cmd; mod id; +mod mutation; mod resp; mod utils; pub use cmd::*; pub use id::{JobId, WorkerId}; +pub use mutation::{MutationFilter, MutationFilterBuilder, MutationTarget}; pub use resp::*; #[cfg(feature = "ent")] @@ -122,6 +124,8 @@ pub struct Job { /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + // TODO: this should probably be a usize, see Failure::retry_count + // TODO: and Failure::retry_remaining. This can go to 0.14 release pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -204,19 +208,45 @@ impl JobBuilder { } } +/// Details on a job's failure. #[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] pub struct Failure { - retry_count: usize, - failed_at: String, - #[serde(skip_serializing_if = "Option::is_none")] - next_at: Option, + /// Number of times this job has been retried. + pub retry_count: usize, + + /// Number of remaining retry attempts. + /// + /// This is the difference between how many times this job + /// _can_ be retried (see [`Job::retry`]) and the number of retry + /// attempts that have already been made (see [`Failure::retry_count`]). + #[serde(rename = "remaining")] + pub retry_remaining: usize, + + /// Last time this job failed. + pub failed_at: DateTime, + + /// When this job will be retried. + /// + /// This will be `None` if there are no retry + /// attempts (see [`Failure::retry_remaining`]) left. #[serde(skip_serializing_if = "Option::is_none")] - message: Option, + pub next_at: Option>, + + /// Error message, if any. #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + + // This is Some("unknown") most of the time, and we are not making + // it public for now, see discussion: + // https://github.com/jonhoo/faktory-rs/pull/89#discussion_r1899423130 + /// Error kind, if known. #[serde(rename = "errtype")] - kind: Option, + pub(crate) kind: Option, + + /// Stack trace from last failure, if any. #[serde(skip_serializing_if = "Option::is_none")] - backtrace: Option>, + pub backtrace: Option>, } impl Job { @@ -261,8 +291,8 @@ impl Job { } /// Data about this job's most recent failure. - pub fn failure(&self) -> &Option { - &self.failure + pub fn failure(&self) -> Option<&Failure> { + self.failure.as_ref() } } diff --git a/src/proto/single/mutation.rs b/src/proto/single/mutation.rs new file mode 100644 index 00000000..da0ebdc9 --- /dev/null +++ b/src/proto/single/mutation.rs @@ -0,0 +1,112 @@ +use crate::JobId; +use derive_builder::Builder; + +#[cfg(doc)] +use crate::{Client, Job}; + +/// Mutation target set. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +#[non_exhaustive] +pub enum MutationTarget { + /// A set of currently enqueued jobs. + #[default] + Scheduled, + + /// A set of jobs that should be retried. + Retries, + + /// A set of failed jobs that will not be retried. + Dead, +} + +// As of Faktory v1.9.2, not all the fields on the filter +// will be taken into account, rather EITHER `jids` OR optional `kind` +// plus optional `pattern`. +// See: https://github.com/contribsys/faktory/issues/489 +// +/// A filter to help narrow down the mutation target. +/// +/// As of Faktory version 1.9.2, if [`MutationFilter::pattern`] and/or [`MutationFilter::kind`] +/// is specified, the values in [`MutationFilter::jids`] will not be taken into account by the +/// server. If you want to filter by `jids`, make sure to leave other fields of the filter empty +/// or use dedicated methods like [`Client::requeue_by_ids`]. +/// +/// Example usage: +/// ```no_run +/// # tokio_test::block_on(async { +/// # use faktory::{Client, MutationTarget, MutationFilter}; +/// # let mut client = Client::connect().await.unwrap(); +/// let filter = MutationFilter::builder() +/// .kind("jobtype_here") +/// .pattern(r#"*\"args\":\[\"fizz\"\]*"#) +/// .build(); +/// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); +/// # }) +/// ``` +#[derive(Builder, Clone, Debug, PartialEq, Eq, Serialize)] +#[builder(setter(into), build_fn(name = "try_build", private), pattern = "owned")] +#[non_exhaustive] +pub struct MutationFilter<'a> { + /// A job's [`kind`](crate::Job::kind). + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "jobtype")] + #[builder(default)] + pub kind: Option<&'a str>, + + /// [`JobId`]s to target. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(custom))] + #[builder(default)] + pub jids: Option<&'a [&'a JobId]>, + + /// Match pattern to use for filtering. + /// + /// Faktory will pass this directly to Redis's `SCAN` command, + /// so please see the [`SCAN` documentation](https://redis.io/docs/latest/commands/scan/) + /// for further details. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "regexp")] + #[builder(default)] + pub pattern: Option<&'a str>, +} + +impl MutationFilter<'_> { + pub(crate) fn is_empty(&self) -> bool { + self.jids.is_none() && self.kind.is_none() && self.pattern.is_none() + } +} + +impl<'a> MutationFilter<'_> { + /// Creates an empty filter. + /// + /// Sending a mutation command (e.g. [`Client::discard`]) with an empty + /// filter effectively means performing no filtering at all. + pub fn empty() -> Self { + Self { + kind: None, + jids: None, + pattern: None, + } + } + + /// Creates a new builder for a [`MutationFilter`]. + pub fn builder() -> MutationFilterBuilder<'a> { + MutationFilterBuilder::default() + } +} + +impl<'a> MutationFilterBuilder<'a> { + /// Ids of jobs to target. + pub fn jids(mut self, value: &'a [&JobId]) -> Self { + self.jids = Some(value).into(); + self + } +} + +impl<'a> MutationFilterBuilder<'a> { + /// Builds a new [`MutationFilter`] from the parameters of this builder. + pub fn build(self) -> MutationFilter<'a> { + self.try_build().expect("infallible") + } +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 2b6aab89..3547d438 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -360,7 +360,22 @@ impl Worker { let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), - Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => { + if e.is_cancelled() { + Fail::generic(jid, "job processing was cancelled") + } else if e.is_panic() { + let panic_obj = e.into_panic(); + if panic_obj.is::() { + Fail::generic(jid, *panic_obj.downcast::().unwrap()) + } else if panic_obj.is::<&'static str>() { + Fail::generic(jid, *panic_obj.downcast::<&'static str>().unwrap()) + } else { + Fail::generic(jid, "job processing panicked") + } + } else { + Fail::generic_with_backtrace(jid, e) + } + } }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index 0dfd6363..9035749a 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,8 +1,18 @@ -use crate::{assert_gte, skip_check}; -use faktory::{Client, Job, JobBuilder, JobId, StopReason, Worker, WorkerBuilder, WorkerId}; +use crate::{assert_gt, assert_gte, assert_lt, skip_check}; +use chrono::Utc; +use faktory::{ + Client, Job, JobBuilder, JobId, JobRunner, MutationFilter, MutationTarget, StopReason, Worker, + WorkerBuilder, WorkerId, +}; +use rand::Rng; use serde_json::Value; -use std::{io, sync, time::Duration}; -use tokio::time as tokio_time; +use std::collections::HashMap; +use std::panic::panic_any; +use std::sync::Arc; +use std::time::Duration; +use std::{io, sync}; +use tokio::sync::mpsc::error::SendError; +use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; #[tokio::test(flavor = "multi_thread")] @@ -380,6 +390,26 @@ async fn queue_control_actions_wildcard() { let local_1 = "queue_control_wildcard_1"; let local_2 = "queue_control_wildcard_2"; + // prepare a client and remove any left-overs + // from the previous test run + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_1).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_2).build(), + ) + .await + .unwrap(); + client.queue_remove(&[local_1]).await.unwrap(); + client.queue_remove(&[local_2]).await.unwrap(); + let (tx, rx) = sync::mpsc::channel(); let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); let tx_2 = sync::Arc::clone(&tx_1); @@ -399,8 +429,6 @@ async fn queue_control_actions_wildcard() { .await .unwrap(); - let mut client = Client::connect().await.unwrap(); - // enqueue two jobs on each queue client .enqueue_many([ @@ -450,6 +478,55 @@ async fn queue_control_actions_wildcard() { // our queue are not even mentioned in the server report: assert!(queues.get(local_1).is_none()); assert!(queues.get(local_2).is_none()); + + // let's also test here one bit from the Faktory MUTATION API, + // which affects the entire target set; + // for this, let's enqueue a few jobs that are not supposed to be + // consumed immediately, rather in a few minutes; this they these + // jobs will get into the `scheduled` set + let soon = Utc::now() + chrono::Duration::seconds(2); + client + .enqueue_many([ + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + ]) + .await + .unwrap(); + + // now, let's just clear all the scheduled jobs + client.clear(MutationTarget::Scheduled).await.unwrap(); + + tokio_time::sleep(Duration::from_secs(2)).await; + + // the queue is empty + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + + // even if we force-schedule those jobs + client + .requeue(MutationTarget::Scheduled, MutationFilter::empty()) + .await + .unwrap(); + + // still empty, meaing the jobs have been purged for good + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); } #[tokio::test(flavor = "multi_thread")] @@ -606,7 +683,7 @@ async fn test_jobs_created_with_builder() { use std::future::Future; use std::pin::Pin; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; fn process_hard_task( sender: sync::Arc>, ) -> Box< @@ -723,39 +800,561 @@ async fn test_jobs_with_blocking_handlers() { } #[tokio::test(flavor = "multi_thread")] -async fn test_panic_in_handler() { +async fn test_panic_and_errors_in_handler() { skip_check!(); - let local = "test_panic_in_handler"; + let job_kind_vs_error_msg = HashMap::from([ + ("panic_SYNC_handler_str", "panic_SYNC_handler_str"), + ("panic_SYNC_handler_String", "panic_SYNC_handler_String"), + ("panic_SYNC_handler_int", "job processing panicked"), + ("error_from_SYNC_handler", "error_from_SYNC_handler"), + ("panic_ASYNC_handler_str", "panic_ASYNC_handler_str"), + ("panic_ASYNC_handler_String", "panic_ASYNC_handler_String"), + ("panic_ASYNC_handler_int", "job processing panicked"), + ("error_from_ASYNC_handler", "error_from_ASYNC_handler"), + ( + "no_handler_registered_for_this_jobtype_initially", + "No handler for no_handler_registered_for_this_jobtype_initially", + ), + ]); + let njobs = job_kind_vs_error_msg.keys().len(); + + // clean up is needed when re-using the same Faktory container, since the + // Faktory server could have re-scheduled (or might be doing it right now) + // the failed jobs from the previous test run; to keep things clean, we are + // force-rescheduling and immediatey dropping any remainders + let local = "test_panic_and_errors_in_handler"; + let mut c = Client::connect().await.unwrap(); + let pattern = format!(r#"*\"args\":\[\"{}\"\]*"#, local); + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + c.queue_remove(&[local]).await.unwrap(); let mut w = Worker::builder::() - .register_blocking_fn("panic_SYNC_handler", |_j| { - panic!("Panic inside sync the handler..."); + // sync handlers + .register_blocking_fn("panic_SYNC_handler_str", |_j| { + panic!("panic_SYNC_handler_str"); + }) + .register_blocking_fn("panic_SYNC_handler_String", |_j| { + panic_any("panic_SYNC_handler_String".to_string()); + }) + .register_blocking_fn("panic_SYNC_handler_int", |_j| { + panic_any(0); + }) + .register_blocking_fn("error_from_SYNC_handler", |_j| { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_SYNC_handler", + )) + }) + // async handlers + .register_fn("panic_ASYNC_handler_str", |_j| async move { + panic!("panic_ASYNC_handler_str"); + }) + .register_fn("panic_ASYNC_handler_String", |_j| async move { + panic_any("panic_ASYNC_handler_String".to_string()); }) - .register_fn("panic_ASYNC_handler", |_j| async move { - panic!("Panic inside async handler..."); + .register_fn("panic_ASYNC_handler_int", |_j| async move { + panic_any(0); + }) + .register_fn("error_from_ASYNC_handler", |_j| async move { + Err::<(), io::Error>(io::Error::new( + io::ErrorKind::Other, + "error_from_ASYNC_handler", + )) }) .connect() .await .unwrap(); - let mut c = Client::connect().await.unwrap(); + // let's enqueue jobs first time and ... + c.enqueue_many( + job_kind_vs_error_msg + .keys() + .map(|&jkind| Job::builder(jkind).queue(local).args([local]).build()) + .collect::>(), + ) + .await + .unwrap(); + + // ... consume all the jobs from the queue and _fail_ them + // "in different ways" (see our worker setup above); + // + // we _did_ consume and process the job, the processing result itself though + // was a failure; however, a panic in the handler was "intercepted" and communicated + // to the Faktory server via the FAIL command, the error message and the backtrace, if any, + // will then be available to a Web UI user or to the worker consumes the retried job + // (if `Job::retry` is Some and > 0 and there are `Failure::retry_remaining` attempts left) + // in this job's `failure` field; see how we are consuming the retried jobs later + // in this test and examin the failure details; + // + // also note how the test run is not interrupted here with a panic + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); + } + + // let's now make sure all the jobs are re-enqueued + c.requeue( + MutationTarget::Retries, + MutationFilter::builder().pattern(pattern.as_str()).build(), + ) + .await + .unwrap(); + + // now, let's create a worker who will only send jobs to the + // test's main thread to make some assertions; + struct JobHandler { + chan: Arc>, + } + impl JobHandler { + pub fn new(chan: Arc>) -> Self { + Self { chan } + } + } + #[async_trait::async_trait] + impl JobRunner for JobHandler { + type Error = SendError; + async fn run(&self, job: Job) -> Result<(), Self::Error> { + self.chan.send(job).await + } + } + let (tx, mut rx) = tokio::sync::mpsc::channel(njobs); + let tx = sync::Arc::new(tx); - c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) + // unlike the previus worker, this one is not failing the jobs, + // it is rather helping us to inspect them + let mut w = Worker::builder() + .register("panic_SYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_SYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_SYNC_handler", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_str", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_String", JobHandler::new(tx.clone())) + .register("panic_ASYNC_handler_int", JobHandler::new(tx.clone())) + .register("error_from_ASYNC_handler", JobHandler::new(tx.clone())) + .register( + "no_handler_registered_for_this_jobtype_initially", + JobHandler::new(tx.clone()), + ) + .connect() .await .unwrap(); - // we _did_ consume and process the job, the processing result itself though - // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command; - // note how the test run is not interrupted with a panic - assert!(w.run_one(0, &[local]).await.unwrap()); + for _ in 0..njobs { + assert!(w.run_one(0, &[local]).await.unwrap()); // reminder: we've requeued the failed jobs + } + + // Let's await till the worker sends the jobs to us. + // + // Note that if a tokio task inside `Worker::run_job` in cancelled(1), we may not receive a job + // via the channel and so `rx.recv_many` will just hang (and so the entire test run), + // hence a timeout we are adding here. + // + // (1)If you are curious, this can be easily reproduced inside the `Callback::Async(_)` arm + // of `Worker::run_job`, by swapping this line: + // ```rust + // spawn(processing_task).await + // ``` + // for something like: + // ```rust + // let handle = spawn(processing_task); + // handle.abort(); + // handle.await + // ``` + // and then running this test. + let mut jobs: Vec = Vec::with_capacity(njobs); + let nreceived = + tokio::time::timeout(Duration::from_secs(5), rx.recv_many(jobs.as_mut(), njobs)) + .await + .expect("all jobs to be recieved within the timeout period"); + + // all the jobs are now in the test's main thread + assert_eq!(nreceived, njobs); + + // let's verify that errors messages in each job's `Failure` are as expected + for job in jobs { + let error_message_got = job.failure().as_ref().unwrap().message.as_ref().unwrap(); + let error_message_expected = *job_kind_vs_error_msg.get(job.kind()).unwrap(); + assert_eq!(error_message_got, error_message_expected); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn mutation_requeue_jobs() { + skip_check!(); + let test_started_at = Utc::now(); + let max_retries = rand::thread_rng().gen_range(2..25); + let panic_message = "Failure should be recorded"; + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "mutation_requeue_jobs"; + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client.queue_remove(&[local]).await.unwrap(); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic_any(panic_message); + }) + .connect() + .await + .unwrap(); + + // enqueue a job + let job = JobBuilder::new(local) + .queue(local) + .retry(max_retries) + .build(); + let job_id = job.id().clone(); + client.enqueue(job).await.unwrap(); + + // consume and fail it + let had_one = worker.run_one(0, &[local]).await.unwrap(); + assert!(had_one); - c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) + // the job is now in `retries` set and is due to + // be rescheduled by the Faktory server after a while, but ... + let had_one = worker.run_one(0, &[local]).await.unwrap(); + assert!(!had_one); + + // ... we can force it, so let's requeue the job and ... + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().jids(&[&job_id]).build(), + ) .await .unwrap(); - // same for async handler, note how the test run is not interrupted with a panic - assert!(!w.is_terminated()); + // ... this time, instead of failing the job this time, let's + // create a new woker that will just send the job + // to the test thread so that we can inspect and + // assert on the failure from the first run + let (tx, rx) = sync::mpsc::channel(); + let tx = sync::Arc::new(sync::Mutex::new(tx)); + let mut w = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local)) + .register_fn(local, move |j| { + let tx = sync::Arc::clone(&tx); + Box::pin(async move { + tx.lock().unwrap().send(j).unwrap(); + Ok::<(), io::Error>(()) + }) + }) + .connect() + .await + .unwrap(); assert!(w.run_one(0, &[local]).await.unwrap()); + let job = rx.recv().unwrap(); + + assert_eq!(job.id(), &job_id); // sanity check + + let failure_info = job.failure().unwrap(); + assert_eq!(failure_info.retry_count, 0); + assert_eq!( + failure_info.retry_remaining, + max_retries as usize - failure_info.retry_count + ); + assert_lt!(failure_info.failed_at, Utc::now()); + assert_gt!(failure_info.failed_at, test_started_at); + assert!(failure_info.next_at.is_some()); + assert_eq!(failure_info.message.as_ref().unwrap(), panic_message); + assert!(failure_info.backtrace.is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn mutation_kill_and_requeue_and_discard() { + skip_check!(); + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "mutation_kill_vs_discard"; + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // enqueue a couple of jobs and ... + client.queue_remove(&[local]).await.unwrap(); + let soon = Utc::now() + chrono::Duration::seconds(2); + client + .enqueue_many([ + Job::builder(local) + .args(vec![Value::from(1)]) + .queue(local) + .at(soon) + .build(), + Job::builder(local) + .args(vec![Value::from(2)]) + .queue(local) + .at(soon) + .build(), + ]) + .await + .unwrap(); + + // kill them ... + client + .kill( + MutationTarget::Scheduled, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // the two jobs were moved from `scheduled` to `dead`, + // and so the queue is empty + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); + + // let's now enqueue those jobs + client + .requeue( + MutationTarget::Dead, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // they transitioned from `dead` to being enqueued + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 2); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic!("force fail this job"); + }) + .connect() + .await + .unwrap(); + + // cosume them (and immediately fail them) and make + // sure the queue is drained + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); // sanity check + + // so the jobs have transitioned from being enqueued + // to the `retries` set, and we can now completely discard them + client + .discard( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Double-check + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Dead, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Scheduled, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Gone for good + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); +} + +#[tokio::test(flavor = "multi_thread")] +async fn mutation_requeue_specific_jobs_only() { + skip_check!(); + + // prepare a client and clean up the queue + // to ensure there are no left-overs + let local = "mutation_requeue_specific_jobs_only"; + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client.queue_remove(&[local]).await.unwrap(); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic!("force fail this job"); + }) + .connect() + .await + .unwrap(); + + // enqueue two jobs + let job1 = JobBuilder::new(local) + .retry(10) + .queue(local) + .args(["fizz"]) + .build(); + let job_id1 = job1.id().clone(); + let job2 = JobBuilder::new(local) + .retry(10) + .queue(local) + .args(["buzz"]) + .build(); + let job_id2 = job2.id().clone(); + assert_ne!(job_id1, job_id2); // sanity check + client.enqueue_many([job1, job2]).await.unwrap(); + + // cosume them (and immediately fail) and make + // sure the queue is drained + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); + + // now let's only requeue one job of kind 'mutation_requeue_specific_jobs_only' + // following this example from the Faktory docs: + // + // MUTATE {"cmd":"kill","target":"retries","filter":{"jobtype":"QuickbooksSyncJob", "jids":["123456789", "abcdefgh"]}} + // (see: https://github.com/contribsys/faktory/wiki/Mutate-API#examples) + // + // NB! we only want one single job (with job_id1) to be immediately re-enqueued, but ... + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .jids(&[&job_id1]) + .kind(local) + .build(), + ) + .await + .unwrap(); + + // ... looks like _all_ the jobs of that jobtype are re-queued + // see: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L96-L98 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + + // let's prove that there _is_ still a way to only requeue one + // specific jobs, and to do so let's verify the queue is drained + // again (since we've just failed the two jobs again): + assert!(!worker.run_one(0, &[local]).await.unwrap()); + + // now, let's requeue a job _without_ specifying a jobkind, rather + // only `jids` in the filter: + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().jids(&[&job_id1]).build(), + ) + .await + .unwrap(); + + // we can now see that only one job has been re-scheduled, just like we wanted + // see: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L100-L110 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // and - for completeness - let's requeue the jobs using + // the comination of `kind` + `pattern` (jobtype and regexp in Faktory's terms); + // let's first make sure to force-reschedule the jobs: + client + .requeue(MutationTarget::Retries, MutationFilter::empty()) + .await + .unwrap(); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // now let's use `kind` + `pattern` combo to only immeduately + // reschedule the job with "fizz" in the `args`, but not the + // one with "buzz": + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .kind(local) + .pattern(r#"*\"args\":\[\"fizz\"\]*"#) + .build(), + ) + .await + .unwrap(); + + // and indeed only one job has behttps://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/mutate.go#L83-L94en re-scheduled, + // see: https://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/mutate.go#L83-L94 + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained + + // just for sanity's sake, let's re-queue the "buzz" one: + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder() + .pattern(r#"*\"args\":\[\"buzz\"\]*"#) + .build(), + ) + .await + .unwrap(); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); // drained }