-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mutate API #87
base: main
Are you sure you want to change the base?
Mutate API #87
Changes from all commits
5a96b3a
eec5bba
2cf61a8
f61d4c0
51bcb8a
ee3d90d
72ee004
1e8839a
44334ba
fce242b
d3639f0
e24c960
48fb985
ac9da93
85993d3
54eecc3
a883f57
17cdb4c
22a4d18
cded093
7d9a94c
8346f08
206e5f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[package] | ||
name = "faktory" | ||
version = "0.13.0" | ||
version = "0.13.1-rc0" | ||
authors = ["Jon Gjengset <[email protected]>"] | ||
edition = "2021" | ||
license = "MIT OR Apache-2.0" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
use crate::{ | ||
proto::single::{MutationAction, MutationType}, | ||
Client, Error, JobId, MutationFilter, MutationTarget, | ||
}; | ||
use std::borrow::Borrow; | ||
|
||
impl Client { | ||
/// Re-enqueue the jobs. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// This method will immediately move the jobs from the targeted set (see [`MutationTarget`]) | ||
/// to their queues. This will apply to the jobs satisfying the [`filter`](crate::MutationFilter). | ||
/// | ||
/// ```no_run | ||
/// # tokio_test::block_on(async { | ||
/// # use faktory::{JobId, Client, MutationTarget, MutationFilter}; | ||
/// # let mut client = Client::connect().await.unwrap(); | ||
/// let job_id1 = JobId::new("3sgE_qwtqw1501"); | ||
/// let job_id2 = JobId::new("3sgE_qwtqw1502"); | ||
/// let failed_ids = [&job_id1, &job_id2]; | ||
/// let filter = MutationFilter::builder().jids(failed_ids.as_slice()).build(); | ||
/// client.requeue(MutationTarget::Retries, &filter).await.unwrap(); | ||
Comment on lines
+20
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This, too, I have a hard time following — this is assuming that two jobs have already failed (I think), and it tries to re-enqueue them from the "retries" set? Why not from the dead set? And where do they get scheduled back — the queue they were originally on? |
||
/// # }); | ||
/// ``` | ||
pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The argument name |
||
where | ||
F: Borrow<MutationFilter<'a>>, | ||
{ | ||
self.mutate(MutationType::Requeue, target, Some(filter.borrow())) | ||
.await | ||
} | ||
|
||
/// Re-enqueue the jobs with the given ids. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`]) | ||
/// with the given `jids` for you. | ||
pub async fn requeue_by_ids<'a>( | ||
Check warning on line 42 in src/proto/client/mutation.rs GitHub Actions / clippy[clippy] src/proto/client/mutation.rs#L42
Raw output
|
||
&mut self, | ||
target: MutationTarget, | ||
jids: &'_ [&'_ JobId], | ||
) -> Result<(), Error> { | ||
let filter = MutationFilter::builder().jids(jids).build(); | ||
self.mutate(MutationType::Requeue, target, Some(&filter)) | ||
.await | ||
} | ||
|
||
/// Discard the jobs. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Will throw the jobs away without any chance for re-scheduling | ||
/// on the server side. If you want to still be able to process the jobs, | ||
/// use [`Client::kill`] instead. | ||
/// | ||
/// E.g. to discard the currently enqueued jobs having "fizz" argument: | ||
/// ```no_run | ||
/// # tokio_test::block_on(async { | ||
/// # use faktory::{Client, MutationTarget, MutationFilter}; | ||
/// # let mut client = Client::connect().await.unwrap(); | ||
/// let filter = MutationFilter::builder() | ||
/// .pattern(r#"*\"args\":\[\"fizz\"\]*"#) | ||
/// .build(); | ||
/// client.discard(MutationTarget::Scheduled, &filter).await.unwrap(); | ||
/// # }); | ||
/// ``` | ||
pub async fn discard<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> | ||
where | ||
F: Borrow<MutationFilter<'a>>, | ||
{ | ||
self.mutate(MutationType::Discard, target, Some(filter.borrow())) | ||
.await | ||
} | ||
|
||
/// Discard the jobs with the given ids. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`]) | ||
/// with the given `jids` for you. | ||
pub async fn discard_by_ids<'a>( | ||
Check warning on line 87 in src/proto/client/mutation.rs GitHub Actions / clippy[clippy] src/proto/client/mutation.rs#L87
Raw output
|
||
&mut self, | ||
target: MutationTarget, | ||
jids: &'_ [&'_ JobId], | ||
) -> Result<(), Error> { | ||
let filter = MutationFilter::builder().jids(jids).build(); | ||
self.mutate(MutationType::Discard, target, Some(&filter)) | ||
.await | ||
} | ||
|
||
/// Kill a set of jobs. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Moves the jobs from the target structure to the `dead` set, meaning Faktory | ||
/// will not touch it further unless you ask it to do so. You then can, for example, | ||
/// manually process those jobs via the Web UI or send another mutation command | ||
/// targeting [`MutationTarget::Dead`] set. | ||
/// | ||
/// E.g. to kill the currently enqueued jobs with "bill" argument: | ||
/// ```no_run | ||
/// # tokio_test::block_on(async { | ||
/// # use faktory::{Client, MutationTarget, MutationFilter}; | ||
/// # let mut client = Client::connect().await.unwrap(); | ||
/// let filter = MutationFilter::builder() | ||
/// .pattern(r#"*\"args\":\[\"bill\"\]*"#) | ||
/// .build(); | ||
/// client.kill(MutationTarget::Scheduled, &filter).await.unwrap(); | ||
/// # }); | ||
/// ``` | ||
pub async fn kill<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error> | ||
where | ||
F: Borrow<MutationFilter<'a>>, | ||
{ | ||
self.mutate(MutationType::Kill, target, Some(filter.borrow())) | ||
.await | ||
} | ||
|
||
/// Kill the jobs with the given ids. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`]) | ||
/// with the given `jids` for you. | ||
pub async fn kill_by_ids<'a>( | ||
Check warning on line 133 in src/proto/client/mutation.rs GitHub Actions / clippy[clippy] src/proto/client/mutation.rs#L133
Raw output
|
||
&mut self, | ||
target: MutationTarget, | ||
jids: &'_ [&'_ JobId], | ||
) -> Result<(), Error> { | ||
let filter = MutationFilter::builder().jids(jids).build(); | ||
self.mutate(MutationType::Kill, target, Some(&filter)).await | ||
} | ||
|
||
/// Purge the targeted structure. | ||
/// | ||
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic, | ||
/// you will want to use it for administration purposes only. | ||
/// | ||
/// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`], | ||
/// but is special cased by Faktory and so is performed faster. Can be thought of as | ||
/// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`. | ||
/// | ||
/// E.g. to purge all the jobs that are pending in the `reties` set: | ||
/// ```no_run | ||
/// # tokio_test::block_on(async { | ||
/// # use faktory::{Client, MutationTarget}; | ||
/// # let mut client = Client::connect().await.unwrap(); | ||
/// client.clear(MutationTarget::Retries).await.unwrap(); | ||
/// # }); | ||
/// ``` | ||
pub async fn clear(&mut self, target: MutationTarget) -> Result<(), Error> { | ||
self.mutate(MutationType::Clear, target, None).await | ||
} | ||
|
||
// For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59 | ||
// The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job | ||
// looking for a substring "id":"..." or performing regexp search, then deserialize the matches into Jobs and | ||
// perform the action (e.g. requeue). | ||
async fn mutate<'a>( | ||
Check warning on line 167 in src/proto/client/mutation.rs GitHub Actions / clippy[clippy] src/proto/client/mutation.rs#L167
Raw output
|
||
&mut self, | ||
mtype: MutationType, | ||
mtarget: MutationTarget, | ||
mfilter: Option<&'_ MutationFilter<'_>>, | ||
) -> Result<(), Error> { | ||
self.issue(&MutationAction { | ||
cmd: mtype, | ||
target: mtarget, | ||
filter: mfilter, | ||
}) | ||
.await? | ||
.read_ok() | ||
.await | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time following what this operation actually does. For example, with
MutationTarget::Scheduled
, what does it mean for the jobs to "immediately be moved to their queues"? They're already in queues?