From dd744bcfac17f64b435a1985a2885ca5aad012fe Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 19 Sep 2024 22:50:12 +0400 Subject: [PATCH 1/6] Use ::connect() vs ::connect_to(&str) --- README.md | 4 +-- examples/run.rs | 4 +-- examples/run_to_completion.rs | 2 +- src/bin/loadtest.rs | 6 ++-- src/lib.rs | 4 +-- src/proto/batch/mod.rs | 8 ++--- src/proto/client/mod.rs | 28 ++++++++++------ src/proto/single/resp.rs | 2 +- src/proto/utils.rs | 4 --- src/tls/native_tls.rs | 19 ++++++++--- src/tls/rustls.rs | 25 ++++++++++----- src/worker/builder.rs | 28 +++++++++++----- src/worker/mod.rs | 4 +-- src/worker/runner.rs | 2 +- tests/real/community.rs | 50 ++++++++++++++--------------- tests/real/enterprise.rs | 60 +++++++++++++++++------------------ 16 files changed, 144 insertions(+), 106 deletions(-) diff --git a/README.md b/README.md index 0fc5ae72..18b4b58b 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ If you want to **submit** jobs to Faktory, use `Client`. ```rust use faktory::{Client, Job}; -let mut c = Client::connect(None).await.unwrap(); +let mut c = Client::connect().await.unwrap(); c.enqueue(Job::new("foobar", vec!["z"])).await.unwrap(); ``` @@ -84,7 +84,7 @@ let mut w = Worker::builder() Ok::<(), io::Error>(()) }) .with_rustls() // available on `rustls` feature only - .connect(None) + .connect() .await .unwrap(); diff --git a/examples/run.rs b/examples/run.rs index f29b285b..48e479b6 100644 --- a/examples/run.rs +++ b/examples/run.rs @@ -49,7 +49,7 @@ async fn main() { let tx = Arc::new(tx); // create a producing client - let mut c = Client::connect(None) + let mut c = Client::connect() .await .expect("client successfully connected"); @@ -61,7 +61,7 @@ async fn main() { // create a worker let mut w = Worker::builder() .register("job_type", JobHandler::new(Arc::clone(&tx))) - .connect(None) + .connect() .await .expect("Connected to server"); diff --git a/examples/run_to_completion.rs b/examples/run_to_completion.rs index e9fb8994..5dd94d8e 100644 --- a/examples/run_to_completion.rs +++ b/examples/run_to_completion.rs @@ -16,7 +16,7 @@ async fn main() { println!("{:?}", j); Ok::<(), IOError>(()) }) - .connect(None) + .connect() .await .expect("Connected to server") .run_to_completion(&["default"]) diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index 60d17d67..869d98b5 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -41,7 +41,7 @@ async fn main() { // ensure that we can actually connect to the server; // will create a client, run a handshake with Faktory, // and drop the cliet immediately afterwards; - if let Err(e) = Client::connect(None).await { + if let Err(e) = Client::connect().await { println!("{}", e); process::exit(1); } @@ -57,7 +57,7 @@ async fn main() { let popped = sync::Arc::clone(&popped); set.spawn(async move { // make producer and consumer - let mut p = Client::connect(None).await.unwrap(); + let mut p = Client::connect().await.unwrap(); let mut worker = WorkerBuilder::default() .register_fn("SomeJob", |_| { Box::pin(async move { @@ -69,7 +69,7 @@ async fn main() { } }) }) - .connect(None) + .connect() .await .unwrap(); let mut rng = rand::rngs::OsRng; diff --git a/src/lib.rs b/src/lib.rs index 30ed5c10..28d8887a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ //! ```no_run //! # tokio_test::block_on(async { //! use faktory::{Client, Job}; -//! let mut client = Client::connect(None).await.unwrap(); +//! let mut client = Client::connect().await.unwrap(); //! client.enqueue(Job::new("foobar", vec!["z"])).await.unwrap(); //! //! let (enqueued_count, errors) = client.enqueue_many([Job::new("foobar", vec!["z"]), Job::new("foobar", vec!["z"])]).await.unwrap(); @@ -82,7 +82,7 @@ //! Ok::<(), io::Error>(()) //! }) //! .with_rustls() // available on `rustls` feature only -//! .connect(None) +//! .connect() //! .await //! .unwrap(); //! diff --git a/src/proto/batch/mod.rs b/src/proto/batch/mod.rs index a375d18c..7de64e85 100644 --- a/src/proto/batch/mod.rs +++ b/src/proto/batch/mod.rs @@ -34,7 +34,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; /// # use faktory::Error; /// use faktory::{Client, Job, ent::Batch}; /// -/// let mut cl = Client::connect(None).await?; +/// let mut cl = Client::connect().await?; /// let job1 = Job::builder("job_type").build(); /// let job2 = Job::builder("job_type").build(); /// let job_cb = Job::builder("callback_job_type").build(); @@ -57,7 +57,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; /// # tokio_test::block_on(async { /// # use faktory::{Client, Job, Error}; /// # use faktory::ent::Batch; -/// # let mut cl = Client::connect(None).await?; +/// # let mut cl = Client::connect().await?; /// let parent_job1 = Job::builder("job_type").build(); /// let parent_job2 = Job::builder("another_job_type").build(); /// let parent_cb = Job::builder("callback_job_type").build(); @@ -97,7 +97,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; /// # use faktory::{Job, Client}; /// # use faktory::ent::{Batch, CallbackState}; /// # tokio_test::block_on(async { -/// let mut cl = Client::connect(None).await?; +/// let mut cl = Client::connect().await?; /// let job = Job::builder("job_type").build(); /// let cb_job = Job::builder("callback_job_type").build(); /// let b = Batch::builder() @@ -109,7 +109,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; /// b.add(job).await?; /// b.commit().await?; /// -/// let mut t = Client::connect(None).await?; +/// let mut t = Client::connect().await?; /// let s = t.get_batch_status(bid).await?.unwrap(); /// assert_eq!(s.total, 1); /// assert_eq!(s.pending, 1); diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 35e19caf..95e44c9b 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -5,6 +5,7 @@ mod ent; #[cfg(doc)] use crate::proto::{BatchStatus, Progress, ProgressUpdate}; +use super::utils::{get_env_url, url_parse}; use super::{single, Info, Push, QueueAction, QueueControl}; use super::{utils, PushBulk}; use crate::error::{self, Error}; @@ -73,7 +74,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// ```no_run /// # tokio_test::block_on(async { /// use faktory::Client; -/// let p = Client::connect(None).await.unwrap(); +/// let p = Client::connect().await.unwrap(); /// # }); /// ``` /// @@ -82,7 +83,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// ```no_run /// # tokio_test::block_on(async { /// use faktory::Client; -/// let p = Client::connect(Some("tcp://:hunter2@localhost:7439")).await.unwrap(); +/// let p = Client::connect_to("tcp://:hunter2@localhost:7439").await.unwrap(); /// # }) /// ``` /// @@ -91,7 +92,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// ```no_run /// # tokio_test::block_on(async { /// # use faktory::Client; -/// # let mut client = Client::connect(None).await.unwrap(); +/// # let mut client = Client::connect().await.unwrap(); /// use faktory::Job; /// client.enqueue(Job::new("foobar", vec!["z"])).await.unwrap(); /// # }); @@ -108,7 +109,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// # tokio_test::block_on(async { /// use faktory::{Client, JobId, ent::JobState}; /// let job_id = JobId::new("W8qyVle9vXzUWQOf"); -/// let mut cl = Client::connect(None).await?; +/// let mut cl = Client::connect().await?; /// if let Some(progress) = cl.get_progress(job_id).await? { /// if let JobState::Success = progress.state { /// # /* @@ -126,7 +127,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// # tokio_test::block_on(async { /// use faktory::{Client, JobId, ent::ProgressUpdate}; /// let jid = JobId::new("W8qyVle9vXzUWQOf"); -/// let mut cl = Client::connect(None).await?; +/// let mut cl = Client::connect().await?; /// let progress = ProgressUpdate::builder(jid) /// .desc("Almost done...".to_owned()) /// .percent(99) @@ -142,7 +143,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// # tokio_test::block_on(async { /// use faktory::{Client, ent::BatchId}; /// let bid = BatchId::new("W8qyVle9vXzUWQOg"); -/// let mut cl = Client::connect(None).await?; +/// let mut cl = Client::connect().await?; /// if let Some(status) = cl.get_batch_status(bid).await? { /// println!("This batch created at {}", status.created_at); /// } @@ -203,7 +204,7 @@ impl Client { impl Client { /// Create new [`Client`] and connect to a Faktory server. /// - /// If `url` is not given, will use the standard Faktory environment variables. Specifically, + /// Will use the standard Faktory environment variables. Specifically, /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address /// from (defaults to `FAKTORY_URL`), and then that environment variable is read to get the /// server address. If the latter environment variable is not defined, the connection will be @@ -212,8 +213,17 @@ impl Client { /// ```text /// tcp://localhost:7419 /// ``` - pub async fn connect(url: Option<&str>) -> Result { - let url = utils::parse_provided_or_from_env(url)?; + pub async fn connect() -> Result { + let url = get_env_url(); + Self::connect_to(&url).await + } + + /// Create new [`Client`] and connect to a Faktory server using specified address. + /// + /// If the address of the Faktory server is present in the environment, + /// you may want to simply use [`Client::connect`]. + pub async fn connect_to(addr: &str) -> Result { + let url = url_parse(addr)?; let stream = TokioStream::connect(utils::host_from_url(&url)).await?; let buffered_stream = BufStream::new(stream); Self::connect_with(buffered_stream, url.password().map(|p| p.to_string())).await diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 05c470ea..047d3649 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -196,7 +196,7 @@ pub struct ServerSnapshot { /// # tokio_test::block_on(async { /// use faktory::Client; /// -/// let mut client = Client::connect(None).await.unwrap(); +/// let mut client = Client::connect().await.unwrap(); /// let _server_state = client.current_info().await.unwrap(); /// # }); /// ``` diff --git a/src/proto/utils.rs b/src/proto/utils.rs index f2ac84db..33178552 100644 --- a/src/proto/utils.rs +++ b/src/proto/utils.rs @@ -27,10 +27,6 @@ pub(crate) fn url_parse(url: &str) -> Result { Ok(url) } -pub(crate) fn parse_provided_or_from_env(url: Option<&str>) -> Result { - url_parse(url.unwrap_or(&get_env_url())) -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 45cc0bf1..71f6da3d 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -24,7 +24,7 @@ use tokio_native_tls::{native_tls::TlsConnector, TlsConnector as AsyncTlsConnect /// use faktory::native_tls::TlsStream; /// use tokio::io::BufStream; /// -/// let stream = TlsStream::connect(None).await.unwrap(); +/// let stream = TlsStream::connect().await.unwrap(); /// let buffered = BufStream::new(stream); /// let cl = Client::connect_with(buffered, None).await.unwrap(); /// # drop(cl); @@ -40,7 +40,7 @@ pub struct TlsStream { } impl TlsStream { - /// Create a new TLS connection over TCP. + /// Create a new TLS connection to Faktory over TCP. /// /// If `url` is not given, will use the standard Faktory environment variables. Specifically, /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address @@ -53,12 +53,23 @@ impl TlsStream { /// ``` /// /// If `url` is given, but does not specify a port, it defaults to 7419. - pub async fn connect(url: Option<&str>) -> Result { + pub async fn connect() -> Result { TlsStream::with_connector( TlsConnector::builder() .build() .map_err(error::Stream::NativeTls)?, - url, + None, + ) + .await + } + + /// Create a new TLS connection to Faktory over TCP using specified address. + pub async fn connect_to(addr: &str) -> Result { + TlsStream::with_connector( + TlsConnector::builder() + .build() + .map_err(error::Stream::NativeTls)?, + Some(addr), ) .await } diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index ac596a64..878bf720 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -25,7 +25,7 @@ use tokio_rustls::TlsConnector; /// use faktory::rustls::TlsStream; /// use tokio::io::BufStream; /// -/// let stream = TlsStream::connect(None).await.unwrap(); +/// let stream = TlsStream::connect().await.unwrap(); /// let buffered = BufStream::new(stream); /// let cl = Client::connect_with(buffered, None).await.unwrap(); /// # drop(cl); @@ -41,7 +41,7 @@ pub struct TlsStream { } impl TlsStream { - /// Create a new TLS connection over TCP. + /// Create a new TLS connection to Faktory over TCP. /// /// If `url` is not given, will use the standard Faktory environment variables. Specifically, /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address @@ -59,19 +59,28 @@ impl TlsStream { /// authentication_. Use [`with_client_config`](TlsStream::with_client_config) /// or [`with_connector`](TlsStream::with_connector) for customized /// `ClientConfig` and `TlsConnector` accordingly. - pub async fn connect(url: Option<&str>) -> Result { - let conf = ClientConfig::builder() + pub async fn connect() -> Result { + let config = ClientConfig::builder() .with_root_certificates(RootCertStore::empty()) .with_no_client_auth(); - let con = TlsConnector::from(Arc::new(conf)); - TlsStream::with_connector(con, url).await + let connnector = TlsConnector::from(Arc::new(config)); + TlsStream::with_connector(connnector, None).await + } + + /// Create a new TLS connection to Faktory over TCP using specified address. + pub async fn connect_to(addr: &str) -> Result { + let config = ClientConfig::builder() + .with_root_certificates(RootCertStore::empty()) + .with_no_client_auth(); + let connector = TlsConnector::from(Arc::new(config)); + TlsStream::with_connector(connector, Some(addr)).await } /// Create a new TLS connection over TCP using native certificates. /// /// Unlike [`TlsStream::connect`], creates a root certificates store populated /// with the certificates loaded from a platform-native certificate store. - pub async fn connect_with_native_certs(url: Option<&str>) -> Result { + pub async fn connect_with_native_certs_to(addr: &str) -> Result { let mut store = RootCertStore::empty(); for cert in rustls_native_certs::load_native_certs()? { store.add(cert).map_err(io::Error::other)?; @@ -79,7 +88,7 @@ impl TlsStream { let config = ClientConfig::builder() .with_root_certificates(store) .with_no_client_auth(); - TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await + TlsStream::with_connector(TlsConnector::from(Arc::new(config)), Some(addr)).await } /// Create a new TLS connection over TCP using a non-default TLS configuration. diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 7ff18767..f05e2e50 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -1,6 +1,9 @@ use super::{runner::Closure, CallbacksRegistry, Client, ShutdownSignal, Worker}; use crate::{ - proto::{utils, ClientOptions}, + proto::{ + utils::{self, get_env_url, url_parse}, + ClientOptions, + }, Error, Job, JobRunner, Reconnect, WorkerId, }; use std::future::Future; @@ -129,7 +132,7 @@ impl WorkerBuilder { /// use tokio_util::sync::CancellationToken; /// use tokio::time::sleep; /// - /// Client::connect(None) + /// Client::connect() /// .await /// .unwrap() /// .enqueue(Job::new("foobar", vec!["z"])) @@ -145,7 +148,7 @@ impl WorkerBuilder { /// let mut w = Worker::builder() /// .with_graceful_shutdown(signal) /// .register_fn("job_type", move |_| async { Ok::<(), std::io::Error>(()) }) - /// .connect(None) + /// .connect() /// .await /// .unwrap(); /// @@ -265,7 +268,7 @@ impl WorkerBuilder { /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). /// - /// Internally, will use [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) + /// Internally, will use [`TlsStream::connect_with_native_certs_to`](crate::rustls::TlsStream::connect_with_native_certs_to) /// to establish a TLS stream to the Faktory server. /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] @@ -316,8 +319,17 @@ impl WorkerBuilder { /// ``` /// /// If `url` is given, but does not specify a port, it defaults to 7419. - pub async fn connect(self, url: Option<&str>) -> Result, Error> { - let parsed_url = utils::parse_provided_or_from_env(url)?; + pub async fn connect(self) -> Result, Error> { + let url = get_env_url(); + self.connect_to(&url).await + } + + /// Connect to a Faktory server using specified address. + /// + /// If the address of the Faktory server is present in the environment, + /// you may want to simply use [`WorkerBuilder::connect`]. + pub async fn connect_to(self, addr: &str) -> Result, Error> { + let parsed_url = url_parse(addr)?; let password = parsed_url.password().map(|p| p.to_string()); match self.tls_kind { TlsKind::None => { @@ -328,13 +340,13 @@ impl WorkerBuilder { } #[cfg(feature = "rustls")] TlsKind::Rust => { - let stream = crate::rustls::TlsStream::connect_with_native_certs(url).await?; + let stream = crate::rustls::TlsStream::connect_with_native_certs_to(addr).await?; let buffered = BufStream::new(stream); self.connect_with(buffered, password).await } #[cfg(feature = "native_tls")] TlsKind::Native => { - let stream = crate::native_tls::TlsStream::connect(url).await?; + let stream = crate::native_tls::TlsStream::connect_to(addr).await?; let buffered = BufStream::new(stream); self.connect_with(buffered, password).await } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 0f734792..38398409 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -125,7 +125,7 @@ type CallbacksRegistry = FnvHashMap>; /// /// let mut w = Worker::builder() /// .register_fn("foo", process_job) -/// .connect(None) +/// .connect() /// .await /// .unwrap(); /// @@ -153,7 +153,7 @@ type CallbacksRegistry = FnvHashMap>; /// println!("{:?}", job); /// Ok::<(), io::Error>(()) /// }) -/// .connect(None) +/// .connect() /// .await /// .unwrap(); /// }); diff --git a/src/worker/runner.rs b/src/worker/runner.rs index caf7030a..1cb77769 100644 --- a/src/worker/runner.rs +++ b/src/worker/runner.rs @@ -38,7 +38,7 @@ use std::future::Future; /// /// let mut w = WorkerBuilder::default() /// .register("foo", handler) -/// .connect(None) +/// .connect() /// .await /// .unwrap(); /// diff --git a/tests/real/community.rs b/tests/real/community.rs index 5eb25063..b9332985 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; #[tokio::test(flavor = "multi_thread")] async fn hello_client() { skip_check!(); - let p = Client::connect(None).await.unwrap(); + let p = Client::connect().await.unwrap(); drop(p); } @@ -19,7 +19,7 @@ async fn hello_worker() { .hostname("tester".to_string()) .labels(vec!["foo".to_string(), "bar".to_string()]) .register_fn("never_called", |_| async move { unreachable!() }) - .connect(None) + .connect() .await .unwrap(); drop(w); @@ -28,7 +28,7 @@ async fn hello_worker() { #[tokio::test(flavor = "multi_thread")] async fn enqueue_job() { skip_check!(); - let mut p = Client::connect(None).await.unwrap(); + let mut p = Client::connect().await.unwrap(); p.enqueue(JobBuilder::new("order").build()).await.unwrap(); } @@ -50,11 +50,11 @@ async fn roundtrip() { Ok::<(), io::Error>(()) }) .register_fn("image", |_| async move { unreachable!() }) - .connect(None) + .connect() .await .unwrap(); - let mut client = Client::connect(None).await.unwrap(); + let mut client = Client::connect().await.unwrap(); client .enqueue( JobBuilder::new("order") @@ -81,12 +81,12 @@ async fn server_state() { // prepare a worker let mut w = WorkerBuilder::default() .register_fn(local, move |_| async move { Ok::<(), io::Error>(()) }) - .connect(None) + .connect() .await .unwrap(); // prepare a producing client - let mut client = Client::connect(None).await.unwrap(); + let mut client = Client::connect().await.unwrap(); // examine server state before pushing anything let server_state = client.current_info().await.unwrap(); @@ -201,11 +201,11 @@ async fn multi() { Ok::<(), io::Error>(()) }) }) - .connect(None) + .connect() .await .unwrap(); - let mut p = Client::connect(None).await.unwrap(); + let mut p = Client::connect().await.unwrap(); p.enqueue(Job::new(local, vec![Value::from(1), Value::from("foo")]).on_queue(local)) .await .unwrap(); @@ -244,11 +244,11 @@ async fn fail() { Err(io::Error::new(io::ErrorKind::Other, "nope")) }) }) - .connect(None) + .connect() .await .unwrap(); - let mut p = Client::connect(None).await.unwrap(); + let mut p = Client::connect().await.unwrap(); // note that *enqueueing* the jobs didn't fail! p.enqueue(Job::new(local, vec![Value::from(1), Value::from("foo")]).on_queue(local)) @@ -286,11 +286,11 @@ async fn queue_control_actions() { let tx = sync::Arc::clone(&tx_2); Box::pin(async move { tx.lock().unwrap().send(true) }) }) - .connect(None) + .connect() .await .unwrap(); - let mut client = Client::connect(None).await.unwrap(); + let mut client = Client::connect().await.unwrap(); // enqueue three jobs client @@ -395,11 +395,11 @@ async fn queue_control_actions_wildcard() { let tx = sync::Arc::clone(&tx_2); Box::pin(async move { tx.lock().unwrap().send(true) }) }) - .connect(None) + .connect() .await .unwrap(); - let mut client = Client::connect(None).await.unwrap(); + let mut client = Client::connect().await.unwrap(); // enqueue two jobs on each queue client @@ -461,7 +461,7 @@ async fn test_jobs_pushed_in_bulk() { let local_3 = "test_jobs_pushed_in_bulk_3"; let local_4 = "test_jobs_pushed_in_bulk_4"; - let mut p = Client::connect(None).await.unwrap(); + let mut p = Client::connect().await.unwrap(); let (enqueued_count, errors) = p .enqueue_many(vec![ Job::builder("common").queue(local_1).build(), @@ -529,7 +529,7 @@ async fn test_jobs_pushed_in_bulk() { Ok::<(), io::Error>(()) }) .register_fn("broken", move |_job| async { Ok::<(), io::Error>(()) }) - .connect(None) + .connect() .await .unwrap(); @@ -558,11 +558,11 @@ async fn test_jobs_created_with_builder() { skip_check!(); // prepare a client and a worker: - let mut cl = Client::connect(None).await.unwrap(); + let mut cl = Client::connect().await.unwrap(); let mut w = Worker::builder() .register_fn("rebuild_index", assert_args_empty) .register_fn("register_order", assert_args_not_empty) - .connect(None) + .connect() .await .unwrap(); @@ -635,7 +635,7 @@ async fn test_shutdown_signals_handling() { let shutdown_timeout = Duration::from_millis(500); // get a client and a job to enqueue - let mut cl = Client::connect(None).await.unwrap(); + let mut cl = Client::connect().await.unwrap(); let j = JobBuilder::new(jkind) .queue(qname) // task will be being processed for at least 1 second @@ -657,7 +657,7 @@ async fn test_shutdown_signals_handling() { .with_graceful_shutdown(signal) .shutdown_timeout(shutdown_timeout) .register_fn(jkind, process_hard_task(tx)) - .connect(None) + .connect() .await .unwrap(); @@ -702,11 +702,11 @@ async fn test_jobs_with_blocking_handlers() { "general_workload", |_j| async move { Ok::<(), io::Error>(()) }, ) - .connect(None) + .connect() .await .unwrap(); - Client::connect(None) + Client::connect() .await .unwrap() .enqueue_many([ @@ -735,11 +735,11 @@ async fn test_panic_in_handler() { .register_fn("panic_ASYNC_handler", |_j| async move { panic!("Panic inside async handler..."); }) - .connect(None) + .connect() .await .unwrap(); - let mut c = Client::connect(None).await.unwrap(); + let mut c = Client::connect().await.unwrap(); c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) .await diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 1aaf2995..9bba419d 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -41,10 +41,10 @@ async fn ent_expiring_job() { let local = "ent_expiring_job"; // prepare a client and a worker: - let mut p = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); let mut w = WorkerBuilder::default() .register_fn("AnExpiringJob", print_job) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); @@ -92,10 +92,10 @@ async fn ent_unique_job() { let job_type = "order"; // prepare client and worker: - let mut p = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); let mut w = WorkerBuilder::default() .register_fn(job_type, print_job) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); @@ -210,7 +210,7 @@ async fn ent_unique_job_until_success() { // send a job difficulty level as a job's args and the lattter // will sleep for a corresponding period of time, pretending // to work hard: - let mut client_a = Client::connect(Some(&url1)).await.unwrap(); + let mut client_a = Client::connect_to(&url1).await.unwrap(); let mut worker_a = WorkerBuilder::default() .register_fn(job_type, |job| async move { let args = job.args().to_owned(); @@ -225,7 +225,7 @@ async fn ent_unique_job_until_success() { eprintln!("{:?}", job); Ok::<(), io::Error>(()) }) - .connect(Some(&url1)) + .connect_to(&url1) .await .unwrap(); let job = JobBuilder::new(job_type) @@ -243,7 +243,7 @@ async fn ent_unique_job_until_success() { time::sleep(time::Duration::from_secs(1)).await; // continue - let mut client_b = Client::connect(Some(&url)).await.unwrap(); + let mut client_b = Client::connect_to(&url).await.unwrap(); // this one is a 'duplicate' because the job is still // being executed in the spawned thread: @@ -292,7 +292,7 @@ async fn ent_unique_job_until_start() { let url1 = url.clone(); let handle = tokio::spawn(async move { - let mut client_a = Client::connect(Some(&url1)).await.unwrap(); + let mut client_a = Client::connect_to(&url1).await.unwrap(); let mut worker_a = WorkerBuilder::default() .register_fn(job_type, |job| async move { let args = job.args().to_owned(); @@ -307,7 +307,7 @@ async fn ent_unique_job_until_start() { eprintln!("{:?}", job); Ok::<(), io::Error>(()) }) - .connect(Some(&url1)) + .connect_to(&url1) .await .unwrap(); client_a @@ -330,7 +330,7 @@ async fn ent_unique_job_until_start() { time::sleep(time::Duration::from_secs(1)).await; // the unique lock has been released by this time, so the job is enqueued successfully: - let mut client_b = Client::connect(Some(&url)).await.unwrap(); + let mut client_b = Client::connect_to(&url).await.unwrap(); client_b .enqueue( JobBuilder::new(job_type) @@ -352,7 +352,7 @@ async fn ent_unique_job_bypass_unique_lock() { skip_if_not_enterprise!(); let url = learn_faktory_url(); - let mut producer = Client::connect(Some(&url)).await.unwrap(); + let mut producer = Client::connect_to(&url).await.unwrap(); let queue_name = "ent_unique_job_bypass_unique_lock"; let job1 = Job::builder("order") .queue(queue_name) @@ -387,7 +387,7 @@ async fn ent_unique_job_bypass_unique_lock() { // have been enqueued for real, while the last one has not. let mut c = WorkerBuilder::default() .register_fn("order", print_job) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); @@ -408,12 +408,12 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { let url = learn_faktory_url(); let t = Arc::new(Mutex::new( - Client::connect(Some(&url)) + Client::connect_to(&url) .await .expect("job progress tracker created successfully"), )); - let mut p = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); let job_tackable = JobBuilder::new("order") .args(vec![Value::from("ISBN-13:9781718501850")]) @@ -439,7 +439,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { let job_id = job_id_copy.clone(); let url = url_copy.clone(); Box::pin(async move { - let mut t = Client::connect(Some(&url)) + let mut t = Client::connect_to(&url) .await .expect("job progress tracker created successfully"); @@ -482,7 +482,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { Ok::<(), io::Error>(eprintln!("{:?}", job)) }) }) - .connect(Some(&url)) + .connect_to(&url) .await .expect("Successfully ran a handshake with 'Faktory'"); assert_had_one!(&mut c, "test_tracker_can_send_progress_update"); @@ -562,14 +562,14 @@ async fn test_batch_of_jobs_can_be_initiated() { skip_if_not_enterprise!(); let url = learn_faktory_url(); - let mut p = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); let mut w = WorkerBuilder::default() .register_fn("thumbnail", |_job| async { Ok::<(), io::Error>(()) }) .register_fn("clean_up", |_job| async { Ok(()) }) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); - let mut t = Client::connect(Some(&url)) + let mut t = Client::connect_to(&url) .await .expect("job progress tracker created successfully"); @@ -701,13 +701,13 @@ async fn test_batches_can_be_nested() { let url = learn_faktory_url(); // Set up 'client', 'worker', and 'tracker': - let mut p = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); let _w = WorkerBuilder::default() .register_fn("jobtype", |_job| async { Ok::<(), io::Error>(()) }) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); - let mut t = Client::connect(Some(&url)) + let mut t = Client::connect_to(&url) .await .expect("job progress tracker created successfully"); @@ -803,12 +803,12 @@ async fn test_callback_will_not_be_queued_unless_batch_gets_committed() { let url = learn_faktory_url(); // prepare a client, a worker of 'order' jobs, and a tracker: - let mut cl = Client::connect(Some(&url)).await.unwrap(); - let mut tr = Client::connect(Some(&url)).await.unwrap(); + let mut cl = Client::connect_to(&url).await.unwrap(); + let mut tr = Client::connect_to(&url).await.unwrap(); let mut w = WorkerBuilder::default() .register_fn("order", |_job| async { Ok(()) }) .register_fn("order_clean_up", |_job| async { Ok::<(), io::Error>(()) }) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); @@ -896,8 +896,8 @@ async fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() { skip_if_not_enterprise!(); let url = learn_faktory_url(); - let mut cl = Client::connect(Some(&url)).await.unwrap(); - let mut tracker = Client::connect(Some(&url)).await.unwrap(); + let mut cl = Client::connect_to(&url).await.unwrap(); + let mut tracker = Client::connect_to(&url).await.unwrap(); let q_name = "test_callback_will_be_queued_upon_commit_even_if_batch_is_empty"; let complete_cb_jobtype = "complete_callback_jobtype"; let success_cb_jobtype = "success_cb_jobtype"; @@ -947,7 +947,7 @@ async fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() { "we want this one to fail to test the 'CallbackState' behavior", )) }) - .connect(Some(&url)) + .connect_to(&url) .await .unwrap(); @@ -986,8 +986,8 @@ async fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() { async fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { skip_if_not_enterprise!(); let url = learn_faktory_url(); - let mut p = Client::connect(Some(&url)).await.unwrap(); - let mut t = Client::connect(Some(&url)).await.unwrap(); + let mut p = Client::connect_to(&url).await.unwrap(); + let mut t = Client::connect_to(&url).await.unwrap(); let mut jobs = some_jobs("order", "test_batch_can_be_reopned_add_extra_jobs_added", 4); let mut callbacks = some_jobs( "order_clean_up", From b28bc8d6b5537b92b5fbba6033aabdd8583987af Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 19 Sep 2024 23:24:53 +0400 Subject: [PATCH 2/6] Use Durations for Job's reserve_for --- src/proto/single/mod.rs | 18 +++++++++++++----- src/proto/single/utils.rs | 22 ++++++++++++++++++++++ tests/real/community.rs | 2 +- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 737b1c2b..d51cf4b7 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -2,6 +2,7 @@ use crate::Error; use chrono::{DateTime, Utc}; use derive_builder::Builder; use std::collections::HashMap; +use std::time::Duration; use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt}; mod cmd; @@ -22,7 +23,7 @@ pub mod ent; pub use id::BatchId; const JOB_DEFAULT_QUEUE: &str = "default"; -const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; +const JOB_DEFAULT_RESERVED_FOR_SECS: u64 = 600; const JOB_DEFAULT_RETRY_COUNT: isize = 25; const JOB_DEFAULT_PRIORITY: u8 = 5; const JOB_DEFAULT_BACKTRACE: usize = 0; @@ -108,9 +109,13 @@ pub struct Job { /// How long to allow this job to run for. /// /// Defaults to 600 seconds. - #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")] - pub reserve_for: Option, + #[serde( + skip_serializing_if = "Option::is_none", + serialize_with = "utils::ser_optional_duration", + deserialize_with = "utils::deser_as_optional_duration" + )] + #[builder(default = "Some(Duration::from_secs(JOB_DEFAULT_RESERVED_FOR_SECS))")] + pub reserve_for: Option, /// Number of times to retry this job. /// @@ -300,7 +305,10 @@ mod test { assert!(job.enqueued_at.is_none()); assert!(job.at.is_none()); - assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS)); + assert_eq!( + job.reserve_for, + Some(Duration::from_secs(JOB_DEFAULT_RESERVED_FOR_SECS)) + ); assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT)); assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY)); assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE)); diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 0138f974..0fc96214 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -30,6 +30,19 @@ where serializer.serialize_u64(secs) } +pub(crate) fn ser_optional_duration( + value: &Option, + serializer: S, +) -> Result +where + S: Serializer, +{ + match value { + None => serializer.serialize_none(), + Some(dur) => serializer.serialize_u64(dur.as_secs()), + } +} + pub(crate) fn deser_duration<'de, D>(value: D) -> Result where D: Deserializer<'de>, @@ -38,6 +51,15 @@ where Ok(Duration::from_secs(secs)) } +pub(crate) fn deser_as_optional_duration<'de, D>(value: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + Ok(u64::deserialize(value) + .ok() + .map(|value| (Duration::from_secs(value)))) +} + pub(crate) fn ser_server_time(value: &NaiveTime, serializer: S) -> Result where S: Serializer, diff --git a/tests/real/community.rs b/tests/real/community.rs index b9332985..f212d33c 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -493,7 +493,7 @@ async fn test_jobs_pushed_in_bulk() { Job::builder("broken") .jid(JobId::new("3sZCbdp8e9WX__1")) .queue(local_3) - .reserve_for(864001) // reserve_for exceeded + .reserve_for(Duration::from_secs(864001)) // reserve_for exceeded .build(), // plus some valid ones: Job::builder("very_special").queue(local_4).build(), From 1795c4f2d550e072e11d2802c8a71a1a4a3e384b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 19 Sep 2024 23:57:23 +0400 Subject: [PATCH 3/6] Add Worker::is_terminated --- src/worker/mod.rs | 12 ++++++++++++ tests/real/community.rs | 1 + 2 files changed, 13 insertions(+) diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 38398409..bbba6178 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -210,6 +210,14 @@ impl Worker { ), } } + + /// Tell if this worker has been terminated. + /// + /// Running a terminate worker ([`Worker::run_one`], [`Worker::run`], [`Worker::run_to_completion`]) + /// will cause a panic. If the worker is terminated, you will need to build and run a new worker instead. + pub fn is_terminated(&self) -> bool { + self.terminated + } } enum Failed { @@ -323,6 +331,8 @@ impl Worker { /// discontinued due to a signal from the Faktory server or a graceful shutdown signal, /// calling this method will mean you are trying to run a _terminated_ worker which will /// cause a panic. You will need to build and run a new worker instead. + /// + /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run_one(&mut self, worker: usize, queues: &[Q]) -> Result where Q: AsRef + Sync, @@ -430,6 +440,8 @@ impl Worker { /// Note that if you provided a shutdown signal when building this worker (see [`WorkerBuilder::with_graceful_shutdown`]), /// and this signal resolved, the worker will be marked as terminated and calling this method will cause a panic. /// You will need to build and run a new worker instead. + /// + /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run(&mut self, queues: &[Q]) -> Result where Q: AsRef, diff --git a/tests/real/community.rs b/tests/real/community.rs index f212d33c..0dfd6363 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -756,5 +756,6 @@ async fn test_panic_in_handler() { .unwrap(); // same for async handler, note how the test run is not interrupted with a panic + assert!(!w.is_terminated()); assert!(w.run_one(0, &[local]).await.unwrap()); } From 79194c2a6da557c8bd35e366ee6e28643fe0a0ac Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 20 Sep 2024 00:06:50 +0400 Subject: [PATCH 4/6] Use tokio::time::interval in heartbeat thread' --- src/proto/single/utils.rs | 4 +--- src/worker/health.rs | 5 +++-- src/worker/mod.rs | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 0fc96214..897c9036 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -55,9 +55,7 @@ pub(crate) fn deser_as_optional_duration<'de, D>(value: D) -> Result, { - Ok(u64::deserialize(value) - .ok() - .map(|value| (Duration::from_secs(value)))) + Ok(u64::deserialize(value).ok().map(Duration::from_secs)) } pub(crate) fn ser_server_time(value: &NaiveTime, serializer: S) -> Result diff --git a/src/worker/health.rs b/src/worker/health.rs index e7984258..6522ff20 100644 --- a/src/worker/health.rs +++ b/src/worker/health.rs @@ -5,7 +5,6 @@ use std::{ sync::{atomic, Arc}, time::{self, Duration}, }; -use tokio::time::sleep as tokio_sleep; const CHECK_STATE_INTERVAL: Duration = Duration::from_millis(100); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -34,9 +33,11 @@ where let mut target = STATUS_RUNNING; let mut last = time::Instant::now(); + let mut check_state_interval = tokio::time::interval(CHECK_STATE_INTERVAL); + check_state_interval.tick().await; loop { - tokio_sleep(CHECK_STATE_INTERVAL).await; + check_state_interval.tick().await; // has a worker failed? let worker_failure = target == STATUS_RUNNING diff --git a/src/worker/mod.rs b/src/worker/mod.rs index bbba6178..2b6aab89 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -331,7 +331,7 @@ impl Worker { /// discontinued due to a signal from the Faktory server or a graceful shutdown signal, /// calling this method will mean you are trying to run a _terminated_ worker which will /// cause a panic. You will need to build and run a new worker instead. - /// + /// /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run_one(&mut self, worker: usize, queues: &[Q]) -> Result where @@ -440,7 +440,7 @@ impl Worker { /// Note that if you provided a shutdown signal when building this worker (see [`WorkerBuilder::with_graceful_shutdown`]), /// and this signal resolved, the worker will be marked as terminated and calling this method will cause a panic. /// You will need to build and run a new worker instead. - /// + /// /// You can check if the worker has been terminated with [`Worker::is_terminated`]. pub async fn run(&mut self, queues: &[Q]) -> Result where From e9e8ae15311476f00f661294f175616ac65f3cd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= <117771945+rustworthy@users.noreply.github.com> Date: Sat, 19 Oct 2024 17:28:48 +0400 Subject: [PATCH 5/6] Update src/proto/single/utils.rs Co-authored-by: Jon Gjengset --- src/proto/single/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 897c9036..0a99411b 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -30,7 +30,7 @@ where serializer.serialize_u64(secs) } -pub(crate) fn ser_optional_duration( +pub(crate) fn ser_optional_duration_in_seconds( value: &Option, serializer: S, ) -> Result From cf8f7570a1c5129c920f48b2b41971c8286c142c Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 19 Oct 2024 18:06:56 +0400 Subject: [PATCH 6/6] Update duration ser-deser helper names --- src/proto/single/mod.rs | 4 ++-- src/proto/single/resp.rs | 4 ++-- src/proto/single/utils.rs | 12 +++++++----- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index d51cf4b7..031b08cd 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -111,8 +111,8 @@ pub struct Job { /// Defaults to 600 seconds. #[serde( skip_serializing_if = "Option::is_none", - serialize_with = "utils::ser_optional_duration", - deserialize_with = "utils::deser_as_optional_duration" + serialize_with = "utils::ser_optional_duration_in_seconds", + deserialize_with = "utils::deser_as_optional_duration_in_seconds" )] #[builder(default = "Some(Duration::from_secs(JOB_DEFAULT_RESERVED_FOR_SECS))")] pub reserve_for: Option, diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 047d3649..5096537c 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -172,8 +172,8 @@ pub struct ServerSnapshot { pub version: semver::Version, /// Faktory server process uptime in seconds. - #[serde(deserialize_with = "utils::deser_duration")] - #[serde(serialize_with = "utils::ser_duration")] + #[serde(deserialize_with = "utils::deser_duration_in_seconds")] + #[serde(serialize_with = "utils::ser_duration_in_seconds")] pub uptime: Duration, /// Number of clients connected to the server. diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 0a99411b..3c3dfae1 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -22,7 +22,7 @@ pub(crate) fn gen_random_wid() -> String { gen_random_id(WORKER_ID_LENGTH) } -pub(crate) fn ser_duration(value: &Duration, serializer: S) -> Result +pub(crate) fn ser_duration_in_seconds(value: &Duration, serializer: S) -> Result where S: Serializer, { @@ -43,7 +43,7 @@ where } } -pub(crate) fn deser_duration<'de, D>(value: D) -> Result +pub(crate) fn deser_duration_in_seconds<'de, D>(value: D) -> Result where D: Deserializer<'de>, { @@ -51,7 +51,9 @@ where Ok(Duration::from_secs(secs)) } -pub(crate) fn deser_as_optional_duration<'de, D>(value: D) -> Result, D::Error> +pub(crate) fn deser_as_optional_duration_in_seconds<'de, D>( + value: D, +) -> Result, D::Error> where D: Deserializer<'de>, { @@ -115,8 +117,8 @@ mod test { fn test_ser_deser_duration() { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] struct FaktoryServer { - #[serde(deserialize_with = "deser_duration")] - #[serde(serialize_with = "ser_duration")] + #[serde(deserialize_with = "deser_duration_in_seconds")] + #[serde(serialize_with = "ser_duration_in_seconds")] uptime: Duration, }