Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutate API #87

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Support Faktory's `MUTATE` API ([#87])
- Make `Failure` struct public ([#89])

### Changed

### Deprecated
Expand All @@ -19,16 +22,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

[#87]: https://github.com/jonhoo/faktory-rs/pull/87
[#89]: https://github.com/jonhoo/faktory-rs/pull/89

## [0.13.0] - 2024-10-27

### Added

- `Error::Stream` for underlying 'native_tls' and 'rustls' errors ([#49])
- rustls, native_tls: `Error::Stream` for underlying `native_tls` and `rustls` errors ([#49])
- Shutdown signal via `WorkerBuilder::with_graceful_shutdown` ([#57])
- Shutdown timeout via `WorkerBuilder::shutdown_timeout` ([#57])
- `Client` method for pausing, resuming, and removing queues ([#59])
- `Client::current_info` and `FaktoryState` struct ([#63])
- TLS configurations options to `WorkerBuilder` ([#74])
- rustls, native_tls: TLS configurations options to `WorkerBuilder` ([#74])

### Changed

Expand Down Expand Up @@ -57,13 +63,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `JobRunner` trait and `ConsumerBuilder::register_runner` ([#51])
- Support for enqueuing numerous jobs with `Producer::enqueue_many` ([#54])
- Faktory Enterprise Edition: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48])
- Faktory Enterprise Edition: Setting and getting a job's progress ([#48])
- ent: Batch jobs (`Batch`, `BatchId`, `BatchStatus`) ([#48])
- ent: Setting and getting a job's progress ([#48])

[#48]: https://github.com/jonhoo/faktory-rs/pull/48
[#51]: https://github.com/jonhoo/faktory-rs/pull/51
[#54]: https://github.com/jonhoo/faktory-rs/pull/54

[unreleased]: https://github.com/jonhoo/faktory-rs/compare/v0.13.0...HEAD
[0.13.0]: https://github.com/jonhoo/faktory-rs/compare/v0.12.5...v0.13.0
[0.12.5]: https://github.com/jonhoo/faktory-rs/compare/v0.12.4...v0.12.5
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "faktory"
version = "0.13.0"
version = "0.13.1-rc0"
authors = ["Jon Gjengset <[email protected]>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,26 @@ test/doc:

.PHONY: test/e2e
test/e2e:
FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} cargo test --locked --all-features --all-targets
FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \
cargo test --locked --all-features --all-targets -- \
--nocapture $(pattern)

.PHONY: test/e2e/ignored
test/e2e/ignored:
FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \
cargo test --locked --all-features --all-targets -- \
--nocapture --include-ignored queue_control_actions_wildcard

.PHONY: test/e2e/tls
test/e2e/tls:
FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \
FAKTORY_URL=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT} \
cargo test --locked --features native_tls,rustls --test tls -- --nocapture
cargo test --locked --features native_tls,rustls --test tls -- \
--nocapture $(pattern)

.PHONY: test/load
test/load:
cargo run --release --features binaries
cargo run --release --features binaries $(jobs) $(threads)

.PHONY: test/perf
test/perf:
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ mod worker;
pub use crate::error::Error;

pub use crate::proto::{
Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect,
ServerSnapshot, WorkerId,
Client, Connection, DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId,
MutationFilter, MutationFilterBuilder, MutationTarget, Reconnect, ServerSnapshot, WorkerId,
};

pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder};
Expand Down
2 changes: 2 additions & 0 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod conn;
pub(crate) use conn::BoxedConnection;
pub use conn::Connection;

mod mutation;

pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2;

fn check_protocols_match(ver: usize) -> Result<(), Error> {
Expand Down
182 changes: 182 additions & 0 deletions src/proto/client/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use crate::{
proto::single::{MutationAction, MutationType},
Client, Error, JobId, MutationFilter, MutationTarget,
};
use std::borrow::Borrow;

impl Client {
/// Re-enqueue the jobs.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// This method will immediately move the jobs from the targeted set (see [`MutationTarget`])
/// to their queues. This will apply to the jobs satisfying the [`filter`](crate::MutationFilter).
Comment on lines +13 to +14
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time following what this operation actually does. For example, with MutationTarget::Scheduled, what does it mean for the jobs to "immediately be moved to their queues"? They're already in queues?

///
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::{JobId, Client, MutationTarget, MutationFilter};
/// # let mut client = Client::connect().await.unwrap();
/// let job_id1 = JobId::new("3sgE_qwtqw1501");
/// let job_id2 = JobId::new("3sgE_qwtqw1502");
/// let failed_ids = [&job_id1, &job_id2];
/// let filter = MutationFilter::builder().jids(failed_ids.as_slice()).build();
/// client.requeue(MutationTarget::Retries, &filter).await.unwrap();
Comment on lines +20 to +24
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, too, I have a hard time following — this is assuming that two jobs have already failed (I think), and it tries to re-enqueue them from the "retries" set? Why not from the dead set? And where do they get scheduled back — the queue they were originally on?

/// # });
/// ```
pub async fn requeue<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument name target makes me think that the jobs will be moved to this, but I don't think that's the case. Isn't it more like source or candidate_job_set or something?

where
F: Borrow<MutationFilter<'a>>,
{
self.mutate(MutationType::Requeue, target, Some(filter.borrow()))
.await
}

/// Re-enqueue the jobs with the given ids.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Similar to [`Client::requeue`], but will create a filter (see [`MutationFilter`])
/// with the given `jids` for you.
pub async fn requeue_by_ids<'a>(

Check warning on line 42 in src/proto/client/mutation.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] src/proto/client/mutation.rs#L42

warning: this lifetime isn't used in the function definition --> src/proto/client/mutation.rs:42:33 | 42 | pub async fn requeue_by_ids<'a>( | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes = note: `#[warn(clippy::extra_unused_lifetimes)]` on by default
Raw output
src/proto/client/mutation.rs:42:33:w:warning: this lifetime isn't used in the function definition
  --> src/proto/client/mutation.rs:42:33
   |
42 |     pub async fn requeue_by_ids<'a>(
   |                                 ^^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
   = note: `#[warn(clippy::extra_unused_lifetimes)]` on by default


__END__
&mut self,
target: MutationTarget,
jids: &'_ [&'_ JobId],
) -> Result<(), Error> {
let filter = MutationFilter::builder().jids(jids).build();
self.mutate(MutationType::Requeue, target, Some(&filter))
.await
}

Check warning on line 50 in src/proto/client/mutation.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mutation.rs#L42-L50

Added lines #L42 - L50 were not covered by tests

/// Discard the jobs.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Will throw the jobs away without any chance for re-scheduling
/// on the server side. If you want to still be able to process the jobs,
/// use [`Client::kill`] instead.
///
/// E.g. to discard the currently enqueued jobs having "fizz" argument:
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::{Client, MutationTarget, MutationFilter};
/// # let mut client = Client::connect().await.unwrap();
/// let filter = MutationFilter::builder()
/// .pattern(r#"*\"args\":\[\"fizz\"\]*"#)
/// .build();
/// client.discard(MutationTarget::Scheduled, &filter).await.unwrap();
/// # });
/// ```
pub async fn discard<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error>
where
F: Borrow<MutationFilter<'a>>,
{
self.mutate(MutationType::Discard, target, Some(filter.borrow()))
.await
}

/// Discard the jobs with the given ids.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Similar to [`Client::discard`], but will create a filter (see [`MutationFilter`])
/// with the given `jids` for you.
pub async fn discard_by_ids<'a>(

Check warning on line 87 in src/proto/client/mutation.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] src/proto/client/mutation.rs#L87

warning: this lifetime isn't used in the function definition --> src/proto/client/mutation.rs:87:33 | 87 | pub async fn discard_by_ids<'a>( | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
Raw output
src/proto/client/mutation.rs:87:33:w:warning: this lifetime isn't used in the function definition
  --> src/proto/client/mutation.rs:87:33
   |
87 |     pub async fn discard_by_ids<'a>(
   |                                 ^^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes


__END__
&mut self,
target: MutationTarget,
jids: &'_ [&'_ JobId],
) -> Result<(), Error> {
let filter = MutationFilter::builder().jids(jids).build();
self.mutate(MutationType::Discard, target, Some(&filter))
.await
}

Check warning on line 95 in src/proto/client/mutation.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mutation.rs#L87-L95

Added lines #L87 - L95 were not covered by tests

/// Kill a set of jobs.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Moves the jobs from the target structure to the `dead` set, meaning Faktory
/// will not touch it further unless you ask it to do so. You then can, for example,
/// manually process those jobs via the Web UI or send another mutation command
/// targeting [`MutationTarget::Dead`] set.
///
/// E.g. to kill the currently enqueued jobs with "bill" argument:
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::{Client, MutationTarget, MutationFilter};
/// # let mut client = Client::connect().await.unwrap();
/// let filter = MutationFilter::builder()
/// .pattern(r#"*\"args\":\[\"bill\"\]*"#)
/// .build();
/// client.kill(MutationTarget::Scheduled, &filter).await.unwrap();
/// # });
/// ```
pub async fn kill<'a, F>(&mut self, target: MutationTarget, filter: F) -> Result<(), Error>
where
F: Borrow<MutationFilter<'a>>,
{
self.mutate(MutationType::Kill, target, Some(filter.borrow()))
.await
}

/// Kill the jobs with the given ids.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Similar to [`Client::kill`], but will create a filter (see [`MutationFilter`])
/// with the given `jids` for you.
pub async fn kill_by_ids<'a>(

Check warning on line 133 in src/proto/client/mutation.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] src/proto/client/mutation.rs#L133

warning: this lifetime isn't used in the function definition --> src/proto/client/mutation.rs:133:30 | 133 | pub async fn kill_by_ids<'a>( | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
Raw output
src/proto/client/mutation.rs:133:30:w:warning: this lifetime isn't used in the function definition
   --> src/proto/client/mutation.rs:133:30
    |
133 |     pub async fn kill_by_ids<'a>(
    |                              ^^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes


__END__
&mut self,
target: MutationTarget,
jids: &'_ [&'_ JobId],
) -> Result<(), Error> {
let filter = MutationFilter::builder().jids(jids).build();
self.mutate(MutationType::Kill, target, Some(&filter)).await
}

Check warning on line 140 in src/proto/client/mutation.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mutation.rs#L133-L140

Added lines #L133 - L140 were not covered by tests

/// Purge the targeted structure.
///
/// ***Warning!*** The `MUTATE` API is not supposed to be used as part of application logic,
/// you will want to use it for administration purposes only.
///
/// Will have the same effect as [`Client::discard`] with an empty [`MutationFilter`],
/// but is special cased by Faktory and so is performed faster. Can be thought of as
/// `TRUNCATE tablename` operation in the SQL world versus `DELETE FROM tablename`.
///
/// E.g. to purge all the jobs that are pending in the `reties` set:
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::{Client, MutationTarget};
/// # let mut client = Client::connect().await.unwrap();
/// client.clear(MutationTarget::Retries).await.unwrap();
/// # });
/// ```
pub async fn clear(&mut self, target: MutationTarget) -> Result<(), Error> {
self.mutate(MutationType::Clear, target, None).await
}

Check warning on line 161 in src/proto/client/mutation.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mutation.rs#L159-L161

Added lines #L159 - L161 were not covered by tests

// For reference: https://github.com/contribsys/faktory/blob/10ccc2270dc2a1c95c3583f7c291a51b0292bb62/server/mutate.go#L35-L59
// The faktory will pull the targeted set from Redis to it's memory, iterate over each stringified job
// looking for a substring "id":"..." or performing regexp search, then deserialize the matches into Jobs and
// perform the action (e.g. requeue).
async fn mutate<'a>(

Check warning on line 167 in src/proto/client/mutation.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] src/proto/client/mutation.rs#L167

warning: this lifetime isn't used in the function definition --> src/proto/client/mutation.rs:167:21 | 167 | async fn mutate<'a>( | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
Raw output
src/proto/client/mutation.rs:167:21:w:warning: this lifetime isn't used in the function definition
   --> src/proto/client/mutation.rs:167:21
    |
167 |     async fn mutate<'a>(
    |                     ^^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes


__END__
&mut self,
mtype: MutationType,
mtarget: MutationTarget,
mfilter: Option<&'_ MutationFilter<'_>>,
) -> Result<(), Error> {
self.issue(&MutationAction {
cmd: mtype,
target: mtarget,
filter: mfilter,
})
.await?
.read_ok()
.await
}
}
5 changes: 4 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ pub use client::{Client, Connection};

mod single;

pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId};
pub use single::{
DataSnapshot, Failure, FaktoryState, Job, JobBuilder, JobId, MutationFilter,
MutationFilterBuilder, MutationTarget, ServerSnapshot, WorkerId,
};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};

Expand Down
Loading
Loading