Skip to content

Commit

Permalink
Send 'END' to Faktory once dropping client
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 18, 2024
1 parent 5ea2a8b commit 5feea0c
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 41 deletions.
6 changes: 3 additions & 3 deletions src/async/consumer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
Error, Job,
};

use super::{AsyncConsumer, CallbacksRegistry, Client};
use super::{AsyncClient, AsyncConsumer, CallbacksRegistry};

/// Convenience wrapper for building a Faktory worker.
///
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<E: 'static> AsyncConsumerBuilder<E> {
) -> Result<AsyncConsumer<S, E>, Error> {
self.opts.password = pwd;
Ok(AsyncConsumer::new(
Client::new(stream, self.opts).await?,
AsyncClient::new(stream, self.opts).await?,
self.workers_count,
self.callbacks,

Check warning on line 90 in src/async/consumer/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/async/consumer/builder.rs#L81-L90

Added lines #L81 - L90 were not covered by tests
)
Expand All @@ -105,7 +105,7 @@ impl<E: 'static> AsyncConsumerBuilder<E> {
}?;

Check warning on line 105 in src/async/consumer/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/async/consumer/builder.rs#L105

Added line #L105 was not covered by tests
let stream = TokioStream::connect(host_from_url(&url)).await?;
let buffered = BufStream::new(stream);
let client = Client::new(buffered, self.opts).await?;
let client = AsyncClient::new(buffered, self.opts).await?;
Ok(AsyncConsumer::new(client, self.workers_count, self.callbacks).await)
}
}
8 changes: 4 additions & 4 deletions src/async/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ mod registries;
pub use builder::AsyncConsumerBuilder;
use registries::{CallbacksRegistry, StatesRegistry};

use super::proto::{AsyncReconnect, Client};
use super::proto::{AsyncClient, AsyncReconnect};

/// Asynchronous version of the [`Consumer`](struct.Consumer.html).
pub struct AsyncConsumer<S: AsyncBufReadExt + AsyncWriteExt + Send, E> {
c: Client<S>,
pub struct AsyncConsumer<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin, E> {
c: AsyncClient<S>,
worker_states: Arc<StatesRegistry>,
callbacks: Arc<CallbacksRegistry<E>>,
terminated: bool,
Expand All @@ -34,7 +34,7 @@ impl<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin + AsyncReconnect, E> Asyn
}

impl<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin, E> AsyncConsumer<S, E> {
async fn new(c: Client<S>, workers_count: usize, callbacks: CallbacksRegistry<E>) -> Self {
async fn new(c: AsyncClient<S>, workers_count: usize, callbacks: CallbacksRegistry<E>) -> Self {
AsyncConsumer {
c,
callbacks: Arc::new(callbacks),
Expand Down
6 changes: 3 additions & 3 deletions src/async/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use crate::{
Error,
};

use super::proto::Client;
use super::proto::AsyncClient;

/// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers.
pub struct AsyncProducer<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin> {
c: Client<S>,
c: AsyncClient<S>,
}

impl<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin> AsyncProducer<S> {
/// Connect to a Faktory server with a non-standard stream.
pub async fn connect_with(stream: S, pwd: Option<String>) -> Result<AsyncProducer<S>, Error> {
Ok(AsyncProducer {
c: Client::new_producer(stream, pwd).await?,
c: AsyncClient::new_producer(stream, pwd).await?,
})
}

Expand Down
46 changes: 31 additions & 15 deletions src/async/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::proto::{
ClientOptions, Fetch, Heartbeat, HeartbeatStatus, Hello, EXPECTED_PROTOCOL_VERSION,
ClientOptions, End, Fetch, Heartbeat, HeartbeatStatus, Hello, EXPECTED_PROTOCOL_VERSION,
};
use crate::{error, Error, Job};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::net::TcpStream as TokioStream;

mod single;

pub struct Client<S: AsyncBufReadExt + AsyncWriteExt + Send> {
pub struct AsyncClient<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin> {
stream: S,
opts: ClientOptions,
}
Expand All @@ -26,13 +26,13 @@ impl AsyncReconnect for TokioStream {
}

Check warning on line 26 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L24-L26

Added lines #L24 - L26 were not covered by tests
}

impl<S> Client<S>
impl<S> AsyncClient<S>
where
S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send + AsyncReconnect,
{
pub(crate) async fn connect_again(&mut self) -> Result<Self, Error> {
let s = self.stream.reconnect().await?;
Client::new(s, self.opts.clone()).await
AsyncClient::new(s, self.opts.clone()).await
}

Check warning on line 36 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L33-L36

Added lines #L33 - L36 were not covered by tests

pub(crate) async fn reconnect(&mut self) -> Result<(), Error> {
Expand All @@ -41,7 +41,20 @@ where
}

Check warning on line 41 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L38-L41

Added lines #L38 - L41 were not covered by tests
}

impl<S> Client<S>
impl<S> Drop for AsyncClient<S>
where
S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send,
{
fn drop(&mut self) {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
single::write_command(&mut self.stream, &End).await.unwrap();
})
});
}
}

impl<S> AsyncClient<S>
where
S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send,
{
Expand Down Expand Up @@ -94,16 +107,19 @@ where
Ok(())
}

pub(crate) async fn new(stream: S, opts: ClientOptions) -> Result<Client<S>, Error> {
let mut c = Client { stream, opts };
pub(crate) async fn new(stream: S, opts: ClientOptions) -> Result<AsyncClient<S>, Error> {
let mut c = AsyncClient { stream, opts };
c.init().await?;
Ok(c)
}

pub(crate) async fn new_producer(stream: S, pwd: Option<String>) -> Result<Client<S>, Error> {
pub(crate) async fn new_producer(
stream: S,
pwd: Option<String>,
) -> Result<AsyncClient<S>, Error> {
let mut opts = ClientOptions::default_for_producer();
opts.password = pwd;
Client::new(stream, opts).await
AsyncClient::new(stream, opts).await
}

pub(crate) async fn issue<FC: single::AsyncFaktoryCommand>(
Expand Down Expand Up @@ -147,7 +163,7 @@ where
}

Check warning on line 163 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L163

Added line #L163 was not covered by tests
}

pub struct ReadToken<'a, S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send>(&'a mut Client<S>);
pub struct ReadToken<'a, S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send>(&'a mut AsyncClient<S>);

impl<'a, S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send> ReadToken<'a, S> {
pub(crate) async fn read_ok(self) -> Result<(), Error> {
Expand All @@ -163,22 +179,22 @@ impl<'a, S: AsyncBufReadExt + AsyncWriteExt + Unpin + Send> ReadToken<'a, S> {
}
#[cfg(test)]
mod test {
use super::{single, Client};
use super::{single, AsyncClient};
use crate::proto::{ClientOptions, Hello, Push, EXPECTED_PROTOCOL_VERSION};
use crate::JobBuilder;
use tokio::io::BufStream;
use tokio::net::TcpStream;

async fn get_connected_client() -> Option<Client<BufStream<TcpStream>>> {
async fn get_connected_client() -> Option<AsyncClient<BufStream<TcpStream>>> {
if std::env::var_os("FAKTORY_URL").is_none() {
return None;

Check warning on line 190 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L190

Added line #L190 was not covered by tests
}
let stream = BufStream::new(TcpStream::connect("127.0.0.1:7419").await.unwrap());
let opts = ClientOptions::default();
Some(Client { stream, opts })
Some(AsyncClient { stream, opts })
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_client_runs_handshake_with_server_after_connect() {
if let Some(mut c) = get_connected_client().await {
let hi = single::read_hi(&mut c.stream).await.unwrap();
Expand All @@ -190,7 +206,7 @@ mod test {
};

Check warning on line 206 in src/async/proto/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/mod.rs#L206

Added line #L206 was not covered by tests
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_client_receives_ok_from_server_after_job_push() {
if let Some(mut c) = get_connected_client().await {
c.init().await.unwrap();
Expand Down
21 changes: 14 additions & 7 deletions src/async/proto/single/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use tokio::io::AsyncWriteExt;

use crate::{
proto::{Ack, Fail, Fetch, Heartbeat, Hello, Info, Push, QueueAction, QueueControl},
proto::{Ack, End, Fail, Fetch, Heartbeat, Hello, Info, Push, QueueAction, QueueControl},
Error,
};

Expand Down Expand Up @@ -62,11 +62,11 @@ where
}

macro_rules! self_to_cmd {
($struct:ident) => {
($struct:ident, $cmd:expr) => {
#[async_trait::async_trait]
impl AsyncFaktoryCommand for $struct {
async fn issue<W: AsyncWriteExt + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
let c = format!("{} ", stringify!($struct).to_uppercase());
let c = format!("{} ", $cmd);
w.write_all(c.as_bytes()).await?;
let r = serde_json::to_vec(self).map_err(Error::Serialization)?;
w.write_all(&r).await?;
Expand All @@ -76,7 +76,14 @@ macro_rules! self_to_cmd {
};
}

self_to_cmd!(Hello);
self_to_cmd!(Ack);
self_to_cmd!(Fail);
self_to_cmd!(Heartbeat);
self_to_cmd!(Hello, "HELLO");
self_to_cmd!(Ack, "ACK");
self_to_cmd!(Fail, "FAIL");
self_to_cmd!(Heartbeat, "BEAT");

Check warning on line 82 in src/async/proto/single/cmd.rs

View check run for this annotation

Codecov / codecov/patch

src/async/proto/single/cmd.rs#L82

Added line #L82 was not covered by tests

#[async_trait::async_trait]
impl AsyncFaktoryCommand for End {
async fn issue<W: AsyncWriteExt + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
Ok(w.write_all(b"END\r\n").await?)
}
}
4 changes: 3 additions & 1 deletion src/bin/async_loadtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ async fn main() {
jobs, threads
);

Check warning on line 38 in src/bin/async_loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/async_loadtest.rs#L13-L38

Added lines #L13 - L38 were not covered by tests

// ensure that we can actually connect to the server
// ensure that we can actually connect to the server;
// will create a client, run a handshake with Faktory,
// and drop the cliet immediately afterwards;
if let Err(e) = AsyncProducer::connect(None).await {
println!("{}", e);
process::exit(1);
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod single;

// commands that users can issue
pub use self::single::{
bad, Ack, Fail, Fetch, Heartbeat, Hello, Info, Job, JobBuilder, Push, QueueAction,
bad, Ack, End, Fail, Fetch, Heartbeat, Hello, Info, Job, JobBuilder, Push, QueueAction,
QueueControl, RawResponse,
};

Expand Down
14 changes: 7 additions & 7 deletions tests/real/async/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use serde_json::Value;

use crate::skip_check;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_hello_p() {
skip_check!();
let p = AsyncProducer::connect(None).await.unwrap();
drop(p);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_enqueue_job() {
skip_check!();
let mut p = AsyncProducer::connect(None).await.unwrap();
Expand All @@ -36,7 +36,7 @@ async fn process_order(j: Job) -> Result<(), std::io::Error> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_roundtrip() {
skip_check!();
let jid = String::from("x-job-id-0123456782");
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn async_roundtrip() {
assert!(drained);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_multi() {
skip_check!();
let local = "multi_async";
Expand Down Expand Up @@ -108,7 +108,7 @@ async fn async_multi() {
assert_eq!(job.args(), &[Value::from(2), Value::from("bar")]);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_fail() {
skip_check!();
let local = "fail_async";
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn async_fail() {
// Faktory Enterprise allows tracking the jobs status.
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_queue() {
skip_check!();
let local = "pause";
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn assert_args_not_empty(j: Job) -> io::Result<()> {
Ok(eprintln!("{:?}", j))
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn async_test_jobs_created_with_builder() {
skip_check!();

Expand Down

0 comments on commit 5feea0c

Please sign in to comment.