Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tech debt #79

Merged
merged 6 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ If you want to **submit** jobs to Faktory, use `Client`.

```rust
use faktory::{Client, Job};
let mut c = Client::connect(None).await.unwrap();
let mut c = Client::connect().await.unwrap();
c.enqueue(Job::new("foobar", vec!["z"])).await.unwrap();
```

Expand Down Expand Up @@ -84,7 +84,7 @@ let mut w = Worker::builder()
Ok::<(), io::Error>(())
})
.with_rustls() // available on `rustls` feature only
.connect(None)
.connect()
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions examples/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() {
let tx = Arc::new(tx);

// create a producing client
let mut c = Client::connect(None)
let mut c = Client::connect()
.await
.expect("client successfully connected");

Expand All @@ -61,7 +61,7 @@ async fn main() {
// create a worker
let mut w = Worker::builder()
.register("job_type", JobHandler::new(Arc::clone(&tx)))
.connect(None)
.connect()
.await
.expect("Connected to server");

Expand Down
2 changes: 1 addition & 1 deletion examples/run_to_completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() {
println!("{:?}", j);
Ok::<(), IOError>(())
})
.connect(None)
.connect()
.await
.expect("Connected to server")
.run_to_completion(&["default"])
Expand Down
6 changes: 3 additions & 3 deletions src/bin/loadtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
// ensure that we can actually connect to the server;
// will create a client, run a handshake with Faktory,
// and drop the cliet immediately afterwards;
if let Err(e) = Client::connect(None).await {
if let Err(e) = Client::connect().await {

Check warning on line 44 in src/bin/loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/loadtest.rs#L44

Added line #L44 was not covered by tests
println!("{}", e);
process::exit(1);
}
Expand All @@ -57,7 +57,7 @@
let popped = sync::Arc::clone(&popped);
set.spawn(async move {
// make producer and consumer
let mut p = Client::connect(None).await.unwrap();
let mut p = Client::connect().await.unwrap();

Check warning on line 60 in src/bin/loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/loadtest.rs#L60

Added line #L60 was not covered by tests
let mut worker = WorkerBuilder::default()
.register_fn("SomeJob", |_| {
Box::pin(async move {
Expand All @@ -69,7 +69,7 @@
}
})
})
.connect(None)
.connect()

Check warning on line 72 in src/bin/loadtest.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/loadtest.rs#L72

Added line #L72 was not covered by tests
.await
.unwrap();
let mut rng = rand::rngs::OsRng;
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
//! ```no_run
//! # tokio_test::block_on(async {
//! use faktory::{Client, Job};
//! let mut client = Client::connect(None).await.unwrap();
//! let mut client = Client::connect().await.unwrap();
//! client.enqueue(Job::new("foobar", vec!["z"])).await.unwrap();
//!
//! let (enqueued_count, errors) = client.enqueue_many([Job::new("foobar", vec!["z"]), Job::new("foobar", vec!["z"])]).await.unwrap();
Expand Down Expand Up @@ -82,7 +82,7 @@
//! Ok::<(), io::Error>(())
//! })
//! .with_rustls() // available on `rustls` feature only
//! .connect(None)
//! .connect()
//! .await
//! .unwrap();
//!
Expand Down
8 changes: 4 additions & 4 deletions src/proto/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// # use faktory::Error;
/// use faktory::{Client, Job, ent::Batch};
///
/// let mut cl = Client::connect(None).await?;
/// let mut cl = Client::connect().await?;
/// let job1 = Job::builder("job_type").build();
/// let job2 = Job::builder("job_type").build();
/// let job_cb = Job::builder("callback_job_type").build();
Expand All @@ -57,7 +57,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// # tokio_test::block_on(async {
/// # use faktory::{Client, Job, Error};
/// # use faktory::ent::Batch;
/// # let mut cl = Client::connect(None).await?;
/// # let mut cl = Client::connect().await?;
/// let parent_job1 = Job::builder("job_type").build();
/// let parent_job2 = Job::builder("another_job_type").build();
/// let parent_cb = Job::builder("callback_job_type").build();
Expand Down Expand Up @@ -97,7 +97,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// # use faktory::{Job, Client};
/// # use faktory::ent::{Batch, CallbackState};
/// # tokio_test::block_on(async {
/// let mut cl = Client::connect(None).await?;
/// let mut cl = Client::connect().await?;
/// let job = Job::builder("job_type").build();
/// let cb_job = Job::builder("callback_job_type").build();
/// let b = Batch::builder()
Expand All @@ -109,7 +109,7 @@ pub(crate) use cmd::{CommitBatch, GetBatchStatus, OpenBatch};
/// b.add(job).await?;
/// b.commit().await?;
///
/// let mut t = Client::connect(None).await?;
/// let mut t = Client::connect().await?;
/// let s = t.get_batch_status(bid).await?.unwrap();
/// assert_eq!(s.total, 1);
/// assert_eq!(s.pending, 1);
Expand Down
28 changes: 19 additions & 9 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod ent;
#[cfg(doc)]
use crate::proto::{BatchStatus, Progress, ProgressUpdate};

use super::utils::{get_env_url, url_parse};
use super::{single, Info, Push, QueueAction, QueueControl};
use super::{utils, PushBulk};
use crate::error::{self, Error};
Expand Down Expand Up @@ -73,7 +74,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
/// let p = Client::connect(None).await.unwrap();
/// let p = Client::connect().await.unwrap();
/// # });
/// ```
///
Expand All @@ -82,7 +83,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
/// let p = Client::connect(Some("tcp://:hunter2@localhost:7439")).await.unwrap();
/// let p = Client::connect_to("tcp://:hunter2@localhost:7439").await.unwrap();
/// # })
/// ```
///
Expand All @@ -91,7 +92,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// ```no_run
/// # tokio_test::block_on(async {
/// # use faktory::Client;
/// # let mut client = Client::connect(None).await.unwrap();
/// # let mut client = Client::connect().await.unwrap();
/// use faktory::Job;
/// client.enqueue(Job::new("foobar", vec!["z"])).await.unwrap();
/// # });
Expand All @@ -108,7 +109,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// # tokio_test::block_on(async {
/// use faktory::{Client, JobId, ent::JobState};
/// let job_id = JobId::new("W8qyVle9vXzUWQOf");
/// let mut cl = Client::connect(None).await?;
/// let mut cl = Client::connect().await?;
/// if let Some(progress) = cl.get_progress(job_id).await? {
/// if let JobState::Success = progress.state {
/// # /*
Expand All @@ -126,7 +127,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// # tokio_test::block_on(async {
/// use faktory::{Client, JobId, ent::ProgressUpdate};
/// let jid = JobId::new("W8qyVle9vXzUWQOf");
/// let mut cl = Client::connect(None).await?;
/// let mut cl = Client::connect().await?;
/// let progress = ProgressUpdate::builder(jid)
/// .desc("Almost done...".to_owned())
/// .percent(99)
Expand All @@ -142,7 +143,7 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// # tokio_test::block_on(async {
/// use faktory::{Client, ent::BatchId};
/// let bid = BatchId::new("W8qyVle9vXzUWQOg");
/// let mut cl = Client::connect(None).await?;
/// let mut cl = Client::connect().await?;
/// if let Some(status) = cl.get_batch_status(bid).await? {
/// println!("This batch created at {}", status.created_at);
/// }
Expand Down Expand Up @@ -203,7 +204,7 @@ impl Client {
impl Client {
/// Create new [`Client`] and connect to a Faktory server.
///
/// If `url` is not given, will use the standard Faktory environment variables. Specifically,
/// Will use the standard Faktory environment variables. Specifically,
/// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address
/// from (defaults to `FAKTORY_URL`), and then that environment variable is read to get the
/// server address. If the latter environment variable is not defined, the connection will be
Expand All @@ -212,8 +213,17 @@ impl Client {
/// ```text
/// tcp://localhost:7419
/// ```
pub async fn connect(url: Option<&str>) -> Result<Client, Error> {
let url = utils::parse_provided_or_from_env(url)?;
pub async fn connect() -> Result<Client, Error> {
let url = get_env_url();
Self::connect_to(&url).await
}

/// Create new [`Client`] and connect to a Faktory server using specified address.
///
/// If the address of the Faktory server is present in the environment,
/// you may want to simply use [`Client::connect`].
pub async fn connect_to(addr: &str) -> Result<Client, Error> {
let url = url_parse(addr)?;
let stream = TokioStream::connect(utils::host_from_url(&url)).await?;
let buffered_stream = BufStream::new(stream);
Self::connect_with(buffered_stream, url.password().map(|p| p.to_string())).await
Expand Down
18 changes: 13 additions & 5 deletions src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::Error;
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt};

mod cmd;
Expand All @@ -22,7 +23,7 @@ pub mod ent;
pub use id::BatchId;

const JOB_DEFAULT_QUEUE: &str = "default";
const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600;
const JOB_DEFAULT_RESERVED_FOR_SECS: u64 = 600;
const JOB_DEFAULT_RETRY_COUNT: isize = 25;
const JOB_DEFAULT_PRIORITY: u8 = 5;
const JOB_DEFAULT_BACKTRACE: usize = 0;
Expand Down Expand Up @@ -108,9 +109,13 @@ pub struct Job {
/// How long to allow this job to run for.
///
/// Defaults to 600 seconds.
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")]
pub reserve_for: Option<usize>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "utils::ser_optional_duration",
deserialize_with = "utils::deser_as_optional_duration"
)]
#[builder(default = "Some(Duration::from_secs(JOB_DEFAULT_RESERVED_FOR_SECS))")]
pub reserve_for: Option<Duration>,

/// Number of times to retry this job.
///
Expand Down Expand Up @@ -300,7 +305,10 @@ mod test {

assert!(job.enqueued_at.is_none());
assert!(job.at.is_none());
assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS));
assert_eq!(
job.reserve_for,
Some(Duration::from_secs(JOB_DEFAULT_RESERVED_FOR_SECS))
);
assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT));
assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY));
assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE));
Expand Down
2 changes: 1 addition & 1 deletion src/proto/single/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub struct ServerSnapshot {
/// # tokio_test::block_on(async {
/// use faktory::Client;
///
/// let mut client = Client::connect(None).await.unwrap();
/// let mut client = Client::connect().await.unwrap();
/// let _server_state = client.current_info().await.unwrap();
/// # });
/// ```
Expand Down
20 changes: 20 additions & 0 deletions src/proto/single/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@
serializer.serialize_u64(secs)
}

pub(crate) fn ser_optional_duration<S>(
rustworthy marked this conversation as resolved.
Show resolved Hide resolved
value: &Option<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
None => serializer.serialize_none(),

Check warning on line 41 in src/proto/single/utils.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/single/utils.rs#L41

Added line #L41 was not covered by tests
Some(dur) => serializer.serialize_u64(dur.as_secs()),
}
}

pub(crate) fn deser_duration<'de, D>(value: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
Expand All @@ -38,6 +51,13 @@
Ok(Duration::from_secs(secs))
}

pub(crate) fn deser_as_optional_duration<'de, D>(value: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
Ok(u64::deserialize(value).ok().map(Duration::from_secs))
}

pub(crate) fn ser_server_time<S>(value: &NaiveTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down
4 changes: 0 additions & 4 deletions src/proto/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ pub(crate) fn url_parse(url: &str) -> Result<Url, Error> {
Ok(url)
}

pub(crate) fn parse_provided_or_from_env(url: Option<&str>) -> Result<Url, Error> {
url_parse(url.unwrap_or(&get_env_url()))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 15 additions & 4 deletions src/tls/native_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/// use faktory::native_tls::TlsStream;
/// use tokio::io::BufStream;
///
/// let stream = TlsStream::connect(None).await.unwrap();
/// let stream = TlsStream::connect().await.unwrap();
/// let buffered = BufStream::new(stream);
/// let cl = Client::connect_with(buffered, None).await.unwrap();
/// # drop(cl);
Expand All @@ -40,7 +40,7 @@
}

impl TlsStream<TokioTcpStream> {
/// Create a new TLS connection over TCP.
/// Create a new TLS connection to Faktory over TCP.
///
/// If `url` is not given, will use the standard Faktory environment variables. Specifically,
/// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address
Expand All @@ -53,12 +53,23 @@
/// ```
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub async fn connect(url: Option<&str>) -> Result<Self, Error> {
pub async fn connect() -> Result<Self, Error> {

Check warning on line 56 in src/tls/native_tls.rs

View check run for this annotation

Codecov / codecov/patch

src/tls/native_tls.rs#L56

Added line #L56 was not covered by tests
TlsStream::with_connector(
TlsConnector::builder()
.build()
.map_err(error::Stream::NativeTls)?,
url,
None,

Check warning on line 61 in src/tls/native_tls.rs

View check run for this annotation

Codecov / codecov/patch

src/tls/native_tls.rs#L61

Added line #L61 was not covered by tests
)
.await
}

Check warning on line 64 in src/tls/native_tls.rs

View check run for this annotation

Codecov / codecov/patch

src/tls/native_tls.rs#L63-L64

Added lines #L63 - L64 were not covered by tests

/// Create a new TLS connection to Faktory over TCP using specified address.
pub async fn connect_to(addr: &str) -> Result<Self, Error> {
TlsStream::with_connector(
TlsConnector::builder()
.build()
.map_err(error::Stream::NativeTls)?,
Some(addr),

Check warning on line 72 in src/tls/native_tls.rs

View check run for this annotation

Codecov / codecov/patch

src/tls/native_tls.rs#L67-L72

Added lines #L67 - L72 were not covered by tests
)
.await
}
Expand Down
Loading
Loading