Skip to content

Commit

Permalink
Unique and expiring jobs (Enterprise Faktory) (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored Jan 7, 2024
1 parent 03775db commit a8a3ebc
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 1 deletion.
39 changes: 39 additions & 0 deletions .github/workflows/ent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This is a CI workflow that runs the test against Enterprise Edition of Faktory.
# The binary (for macos only) is avalable for download for testing purposes with each Faktory release.
permissions:
contents: read
on:
push:
branches:
- main
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
name: enterprise
jobs:
test:
runs-on: macos-latest
env:
FAKTORY_VERSION: 1.8.0
steps:
- uses: actions/checkout@v4
- name: Install redis
run: brew install redis
- name: Download Faktory binary
run: |
wget -O faktory.tbz https://github.com/contribsys/faktory/releases/download/v${{ env.FAKTORY_VERSION }}/faktory-ent_${{ env.FAKTORY_VERSION }}.macos.amd64.tbz
tar xfv faktory.tbz
cp ./faktory /usr/local/bin
- name: Launch Faktory in background
run: faktory &
- name: Install stable
uses: dtolnay/rust-toolchain@stable
- name: cargo generate-lockfile
if: hashFiles('Cargo.lock') == ''
run: cargo generate-lockfile
- name: Run tests
env:
FAKTORY_URL: tcp://127.0.0.1:7419
FAKTORY_ENT: true
run: cargo test --locked --features ent --all-targets
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["api-bindings", "asynchronous", "network-programming"]
default = []
tls = ["native-tls"]
binaries = ["clap"]
ent = []

[dependencies]
serde_json = "1.0"
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ pub enum Protocol {
desc: String,
},

/// The server reported a unique constraint violation.
#[cfg(feature = "ent")]
#[error("server reported unique constraint violation: {msg}")]
UniqueConstraintViolation {
/// The error message given by the server.
msg: String,
},

/// The server responded with an error.
#[error("an internal server error occurred: {msg}")]
Internal {
Expand Down Expand Up @@ -139,6 +147,8 @@ impl Protocol {
match code {
Some("ERR") => Protocol::Internal { msg: error },
Some("MALFORMED") => Protocol::Malformed { desc: error },
#[cfg(feature = "ent")]
Some("NOTUNIQUE") => Protocol::UniqueConstraintViolation { msg: error },
Some(c) => Protocol::Internal {
msg: format!("{} {}", c, error),
},
Expand Down
141 changes: 141 additions & 0 deletions src/proto/single/ent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use chrono::{DateTime, Utc};

use crate::JobBuilder;

impl JobBuilder {
/// When Faktory should expire this job.
///
/// Faktory Enterprise allows for expiring jobs. This is setter for `expires_at`
/// field in the job's custom data.
/// ```
/// # use faktory::JobBuilder;
/// # use chrono::{Duration, Utc};
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_at(Utc::now() + Duration::hours(1))
/// .build();
/// ```
pub fn expires_at(&mut self, dt: DateTime<Utc>) -> &mut Self {
self.add_to_custom_data(
"expires_at",
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
)
}

/// In what period of time from now (UTC) the Faktory should expire this job.
///
/// Under the hood, the method will call `Utc::now` and add the provided `ttl` duration.
/// You can use this setter when you have a duration rather than some exact date and time,
/// expected by [`expires_at`](struct.JobBuilder.html#method.expires_at) setter.
/// Example usage:
/// ```
/// # use faktory::JobBuilder;
/// # use chrono::Duration;
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_in(Duration::weeks(1))
/// .build();
/// ```
pub fn expires_in(&mut self, ttl: chrono::Duration) -> &mut Self {
self.expires_at(Utc::now() + ttl)
}

/// How long the Faktory will not accept duplicates of this job.
///
/// The job will be considered unique for the kind-args-queue combination. The uniqueness is best-effort,
/// rather than a guarantee. Check out the Enterprise Faktory [docs](https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs)
/// for details on how scheduling, retries, and other features live together with `unique_for`.
///
/// If you've already created and pushed a unique job (job "A") to the Faktory server and now have got another one
/// of same kind, with the same args and destined for the same queue (job "B") and you would like - for some reason - to
/// bypass the unique constraint, simply leave `unique_for` field on the job's custom hash empty, i.e. do not use this setter.
/// In this case, the Faktory server will accept job "B", though technically this job "B" is a duplicate.
pub fn unique_for(&mut self, secs: usize) -> &mut Self {
self.add_to_custom_data("unique_for", secs)
}

/// Remove unique lock for this job right before the job starts executing.
///
/// Another job with the same kind-args-queue combination will be accepted by the Faktory server
/// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished
/// _or_ after this job has been been consumed (i.e. its execution has ***started***).
pub fn unique_until_start(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until", "start")
}

/// Do not remove unique lock for this job until it successfully finishes.
///
/// Sets `unique_until` on the Job's custom hash to `success`, which is Faktory's default.
/// Another job with the same kind-args-queue combination will be accepted by the Faktory server
/// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished
/// _or_ after this job has been been ***successfully*** processed.
pub fn unique_until_success(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until", "success")
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, Utc};

use crate::JobBuilder;

fn half_stuff() -> JobBuilder {
let mut job = JobBuilder::new("order");
job.args(vec!["ISBN-13:9781718501850"]);
job
}

// Returns date and time string in the format expected by Faktory.
// Serializes date and time into a string as per RFC 3338 and ISO 8601
// with nanoseconds precision and 'Z' literal for the timzone column.
fn to_iso_string(dt: DateTime<Utc>) -> String {
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
}

#[test]
fn test_expiration_feature_for_enterprise_faktory() {
let five_min = chrono::Duration::seconds(300);
let exp_at = Utc::now() + five_min;
let job1 = half_stuff().expires_at(exp_at).build();
let stored = job1.custom.get("expires_at").unwrap();
assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at)));

let job2 = half_stuff().expires_in(five_min).build();
assert!(job2.custom.get("expires_at").is_some());
}

#[test]
fn test_uniqueness_faeture_for_enterprise_faktory() {
let job = half_stuff().unique_for(60).unique_until_start().build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(60));
assert_eq!(stored_unique_until, &serde_json::Value::from("start"));

let job = half_stuff().unique_for(60).unique_until_success().build();

let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_until, &serde_json::Value::from("success"));
}

#[test]
fn test_same_purpose_setters_applied_simultaneously() {
let expires_at1 = Utc::now() + chrono::Duration::seconds(300);
let expires_at2 = Utc::now() + chrono::Duration::seconds(300);
let job = half_stuff()
.unique_for(60)
.add_to_custom_data("unique_for", 600)
.unique_for(40)
.add_to_custom_data("expires_at", to_iso_string(expires_at1))
.expires_at(expires_at2)
.build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(40));
let stored_expires_at = job.custom.get("expires_at").unwrap();
assert_eq!(
stored_expires_at,
&serde_json::Value::from(to_iso_string(expires_at2))
)
}
}
29 changes: 28 additions & 1 deletion src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod cmd;
mod resp;
mod utils;

#[cfg(feature = "ent")]
mod ent;

use crate::error::Error;

pub use self::cmd::*;
Expand Down Expand Up @@ -145,7 +148,7 @@ pub struct Job {
}

impl JobBuilder {
/// Create a new builder for a [`Job`]
/// Creates a new builder for a [`Job`]
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
Expand All @@ -162,6 +165,17 @@ impl JobBuilder {
self
}

/// Sets arbitrary key-value pairs to this job's custom data hash.
pub fn add_to_custom_data(
&mut self,
k: impl Into<String>,
v: impl Into<serde_json::Value>,
) -> &mut Self {
let custom = self.custom.get_or_insert_with(HashMap::new);
custom.insert(k.into(), v.into());
self
}

/// Builds a new [`Job`] from the parameters of this builder.
pub fn build(&self) -> Job {
self.try_build()
Expand Down Expand Up @@ -304,4 +318,17 @@ mod test {
assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}

#[test]
fn test_arbitrary_custom_data_setter() {
let job = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.add_to_custom_data("arbitrary_key", "arbitrary_value")
.build();

assert_eq!(
job.custom.get("arbitrary_key").unwrap(),
&serde_json::Value::from("arbitrary_value")
);
}
}
File renamed without changes.
Loading

0 comments on commit a8a3ebc

Please sign in to comment.