Skip to content

Commit

Permalink
Batches and trackable jobs (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored Feb 18, 2024
1 parent 0bd2ab0 commit cdf15d3
Show file tree
Hide file tree
Showing 18 changed files with 1,761 additions and 164 deletions.
17 changes: 0 additions & 17 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,6 @@ jobs:
components: rustfmt
- name: cargo fmt --check
run: cargo fmt --check
# This is currently a dedicated job due to the rustfmt's `group_imports` configuration
# option being available on the nightly channel only as of February 2024.
# Once stabilized, can be merged with the `stable / fmt` job in this workflow.
# See: https://github.com/rust-lang/rustfmt/issues/5083
imports:
runs-on: ubuntu-latest
name: nightly / fmt (import grouping)
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Install nightly
uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt
- name: cargo +nightly fmt -- --config group_imports=one --check
run: cargo +nightly fmt -- --config group_imports=one --check
clippy:
runs-on: ubuntu-latest
name: ${{ matrix.toolchain }} / clippy
Expand Down
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ check:
cargo fmt --check
cargo clippy
cargo d --no-deps --all-features
cargo +nightly fmt -- --config group_imports=one --check

.PHONY: doc
doc:
Expand Down Expand Up @@ -36,10 +35,6 @@ faktory/tls:
faktory/tls/kill:
docker compose -f docker/compose.yml down

.PHONY: sort
sort:
cargo +nightly fmt -- --config group_imports=one

.PHONY: test
test:
cargo t --locked --all-features --all-targets
Expand Down
12 changes: 6 additions & 6 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use crate::proto::{Ack, Fail, Job};
use crate::proto::{
self, parse_provided_or_from_env, Ack, Client, ClientOptions, Fail, HeartbeatStatus, Job,
Reconnect,
};
use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
Expand Down Expand Up @@ -212,10 +214,7 @@ impl<E> ConsumerBuilder<E> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(self, url: Option<&str>) -> Result<Consumer<TcpStream, E>, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let url = parse_provided_or_from_env(url)?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(self, stream, url.password().map(|p| p.to_string()))
}
Expand All @@ -227,6 +226,7 @@ impl<E> ConsumerBuilder<E> {
pwd: Option<String>,
) -> Result<Consumer<S, E>, Error> {
self.opts.password = pwd;
self.opts.is_worker = true;
Ok(Consumer::new(
Client::new(stream, self.opts)?,
self.workers,
Expand Down
24 changes: 18 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,31 @@
#[macro_use]
extern crate serde_derive;

mod consumer;
pub mod error;

mod consumer;
mod producer;
mod proto;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;
pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;
pub use crate::proto::{Job, JobBuilder};
pub use crate::proto::{Client, Job, JobBuilder, Reconnect};

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
/// Constructs only available with the enterprise version of Faktory.
pub mod ent {
pub use crate::proto::{
Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, JobState, Progress,
ProgressUpdate, ProgressUpdateBuilder,
};
}

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use tls::TlsStream;
42 changes: 32 additions & 10 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::error::Error;
use crate::proto::{self, Client, Info, Job, Push, PushBulk, QueueAction, QueueControl};
use crate::proto::{Client, Info, Job, Push, PushBulk, QueueAction, QueueControl};
use std::collections::HashMap;
use std::io::prelude::*;
use std::net::TcpStream;

#[cfg(feature = "ent")]
use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch};

/// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers.
///
/// # Connecting to Faktory
Expand Down Expand Up @@ -83,21 +86,16 @@ impl Producer<TcpStream> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(url: Option<&str>) -> Result<Self, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(stream, url.password().map(|p| p.to_string()))
let c = Client::connect(url)?;
Ok(Producer { c })
}
}

impl<S: Read + Write> Producer<S> {
/// Connect to a Faktory server with a non-standard stream.
pub fn connect_with(stream: S, pwd: Option<String>) -> Result<Producer<S>, Error> {
Ok(Producer {
c: Client::new_producer(stream, pwd)?,
})
let c = Client::connect_with(stream, pwd)?;
Ok(Producer { c })
}

/// Enqueue the given job on the Faktory server.
Expand Down Expand Up @@ -162,6 +160,30 @@ impl<S: Read + Write> Producer<S> {
.issue(&QueueControl::new(QueueAction::Resume, queues))?
.await_ok()
}

/// Initiate a new batch of jobs.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
let bid = self.c.issue(&batch)?.read_bid()?;
Ok(BatchHandle::new(bid, self))
}

/// Open an already existing batch of jobs.
///
/// This will not error if a batch with the provided `bid` does not exist,
/// rather `Ok(None)` will be returned.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub fn open_batch(&mut self, bid: String) -> Result<Option<BatchHandle<'_, S>>, Error> {
let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?;
Ok(bid.map(|bid| BatchHandle::new(bid, self)))
}

#[cfg(feature = "ent")]
pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> {
self.c.issue(&CommitBatch::from(bid))?.await_ok()
}
}

#[cfg(test)]
Expand Down
66 changes: 66 additions & 0 deletions src/proto/batch/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::ent::Batch;
use crate::proto::single::FaktoryCommand;
use crate::Error;
use std::io::Write;

impl FaktoryCommand for Batch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH NEW ")?;
serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct CommitBatch(String);

impl From<String> for CommitBatch {
fn from(value: String) -> Self {
CommitBatch(value)
}
}

impl FaktoryCommand for CommitBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH COMMIT ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct GetBatchStatus(String);

impl From<String> for GetBatchStatus {
fn from(value: String) -> Self {
GetBatchStatus(value)
}
}

impl FaktoryCommand for GetBatchStatus {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH STATUS ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct OpenBatch(String);

impl From<String> for OpenBatch {
fn from(value: String) -> Self {
OpenBatch(value)
}
}

impl FaktoryCommand for OpenBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH OPEN ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}
Loading

0 comments on commit cdf15d3

Please sign in to comment.