diff --git a/Cargo.lock b/Cargo.lock index 8505c38b..a4de9e5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,6 +493,7 @@ dependencies = [ "pin-project", "rand 0.8.0", "rustls-pki-types", + "semver", "serde", "serde_derive", "serde_json", @@ -1099,20 +1100,29 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +dependencies = [ + "serde", +] + [[package]] name = "serde" -version = "1.0.186" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f5db24220c009de9bd45e69fb2938f4b6d2df856aa9304ce377b3180f83b7c1" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.186" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad697f7e0b65af4983a4ce8f56ed5b357e8d3c36651bf6a7e13639c17b8e670" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 5e5a257e..ee94fa0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ tokio = { version = "1.35.1", features = [ tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } url = "2" +semver = { version = "1.0.23", features = ["serde"] } [dev-dependencies] rustls-pki-types = "1.0.1" diff --git a/Makefile b/Makefile index 16a2ff95..e5221541 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ faktory/tls: .PHONY: faktory/tls/kill faktory/tls/kill: - docker compose -f docker/compose.yml down + docker compose -f docker/compose.yml down -v .PHONY: test test: diff --git a/src/lib.rs b/src/lib.rs index 6f702741..f8f97828 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,9 @@ mod proto; mod worker; pub use crate::error::Error; -pub use crate::proto::{Client, Job, JobBuilder, JobId, Reconnect, WorkerId}; +pub use crate::proto::{ + Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId, +}; pub use crate::worker::{JobRunner, Worker, WorkerBuilder}; #[cfg(feature = "ent")] diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 7d256571..997ecd9e 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -358,10 +358,10 @@ where Ok((jobs_count - errors.len(), Some(errors))) } - /// Retrieve information about the running server. + /// Retrieve [information](crate::ServerSnapshot) about the running server. /// /// The returned value is the result of running the `INFO` command on the server. - pub async fn info(&mut self) -> Result { + pub async fn current_info(&mut self) -> Result { self.issue(&Info) .await? .read_json() diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5768b085..63f479cd 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -9,7 +9,7 @@ pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSIO mod single; -pub use single::{Job, JobBuilder, JobId, WorkerId}; +pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId}; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/ent/progress.rs b/src/proto/single/ent/progress.rs index 49304a5a..be227d9b 100644 --- a/src/proto/single/ent/progress.rs +++ b/src/proto/single/ent/progress.rs @@ -1,8 +1,8 @@ -use crate::proto::single::JobId; - use super::utils; +use crate::proto::single::JobId; use chrono::{DateTime, Utc}; use derive_builder::Builder; + /// Info on job execution progress (sent). /// /// In Enterprise Faktory, a client executing a job can report on the execution diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 5d4ede59..05c470ea 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,9 +1,13 @@ +use super::utils; +use crate::error::{self, Error}; +use chrono::{DateTime, Utc}; +use std::collections::BTreeMap; +use std::time::Duration; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt}; + #[cfg(feature = "ent")] use crate::ent::BatchId; -use crate::error::{self, Error}; -use tokio::io::AsyncBufRead; - pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), @@ -118,6 +122,107 @@ pub async fn read_ok(r: R) -> Result<(), Error> { Err(bad("server ok", &rr).into()) } +// ---------------------------------------------- + +/// Faktory service information. +/// +/// This holds information on the registered [queues](DataSnapshot::queues) as well as +/// some aggregated data, e.g. total number of jobs [processed](DataSnapshot::total_processed), +/// total number of jobs [enqueued](DataSnapshot::total_enqueued), etc. +#[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] +pub struct DataSnapshot { + /// Total number of job failures. + pub total_failures: u64, + + /// Total number of processed jobs. + pub total_processed: u64, + + /// Total number of enqueued jobs. + pub total_enqueued: u64, + + /// Total number of queues. + pub total_queues: u64, + + /// Queues stats. + /// + /// A mapping between a queue name and its size (number of jobs on the queue). + /// The keys of this map effectively make up a list of queues that are currently + /// registered in the Faktory service. + pub queues: BTreeMap, + + /// ***Deprecated***. Faktory's task runner stats. + /// + /// Note that this is exposed as a "generic" `serde_json::Value` since this info + /// belongs to deep implementation details of the Faktory service. + #[deprecated( + note = "marked as deprecated in the Faktory source code and is likely to be completely removed in the future, so please do not rely on this data" + )] + pub tasks: serde_json::Value, +} + +/// Faktory's server process information. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ServerSnapshot { + /// Description of the server process (e.g. "Faktory"). + pub description: String, + + /// Faktory's version as semver. + #[serde(rename = "faktory_version")] + pub version: semver::Version, + + /// Faktory server process uptime in seconds. + #[serde(deserialize_with = "utils::deser_duration")] + #[serde(serialize_with = "utils::ser_duration")] + pub uptime: Duration, + + /// Number of clients connected to the server. + pub connections: u64, + + /// Number of executed commands. + pub command_count: u64, + + /// Faktory server process memory usage. + pub used_memory_mb: u64, +} + +/// Current server state. +/// +/// Contains such details as how many queues there are on the server, statistics on the jobs, +/// as well as some specific info on server process memory usage, uptime, etc. +/// +/// Here is an example of the simplest way to fetch info on the server state. +/// ```no_run +/// # tokio_test::block_on(async { +/// use faktory::Client; +/// +/// let mut client = Client::connect(None).await.unwrap(); +/// let _server_state = client.current_info().await.unwrap(); +/// # }); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FaktoryState { + /// Server time. + pub now: DateTime, + + /// Server time (naive representation). + /// + /// Faktory sends it as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC") + /// and it is being parsed as `NaiveTime`. + /// + /// Most of the time, though, you will want to use [`FaktoryState::now`] instead. + #[serde(deserialize_with = "utils::deser_server_time")] + #[serde(serialize_with = "utils::ser_server_time")] + pub server_utc_time: chrono::naive::NaiveTime, + + /// Faktory service information. + #[serde(rename = "faktory")] + pub data: DataSnapshot, + + /// Faktory's server process information. + pub server: ServerSnapshot, +} + // ---------------------------------------------- // // below is the implementation of the Redis RESP protocol @@ -132,7 +237,6 @@ pub enum RawResponse { Null, } -use tokio::io::{AsyncBufReadExt, AsyncReadExt}; async fn read(mut r: R) -> Result where R: AsyncBufRead + Unpin, diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index ed4f55f6..0138f974 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -1,4 +1,7 @@ +use chrono::naive::NaiveTime; use rand::{thread_rng, Rng}; +use serde::{de::Deserializer, Deserialize, Serializer}; +use std::time::Duration; const JOB_ID_LENGTH: usize = 16; const WORKER_ID_LENGTH: usize = 32; @@ -19,6 +22,44 @@ pub(crate) fn gen_random_wid() -> String { gen_random_id(WORKER_ID_LENGTH) } +pub(crate) fn ser_duration(value: &Duration, serializer: S) -> Result +where + S: Serializer, +{ + let secs = value.as_secs(); + serializer.serialize_u64(secs) +} + +pub(crate) fn deser_duration<'de, D>(value: D) -> Result +where + D: Deserializer<'de>, +{ + let secs = u64::deserialize(value)?; + Ok(Duration::from_secs(secs)) +} + +pub(crate) fn ser_server_time(value: &NaiveTime, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&format!("{} UTC", value)) +} + +pub(crate) fn deser_server_time<'de, D>(value: D) -> Result +where + D: Deserializer<'de>, +{ + let naive_time_str = String::deserialize(value)?; + let naive_time_str = naive_time_str + .strip_suffix(" UTC") + .ok_or(serde::de::Error::custom( + "Expected a naive time string that ends with ' UTC'", + ))?; + let naive_time = + NaiveTime::parse_from_str(naive_time_str, "%H:%M:%S").map_err(serde::de::Error::custom)?; + Ok(naive_time) +} + #[cfg(test)] mod test { use super::*; @@ -49,4 +90,43 @@ mod test { } assert_eq!(ids.len(), 1_000_000); } + + #[test] + fn test_ser_deser_duration() { + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] + struct FaktoryServer { + #[serde(deserialize_with = "deser_duration")] + #[serde(serialize_with = "ser_duration")] + uptime: Duration, + } + + let server = FaktoryServer { + uptime: Duration::from_secs(2024), + }; + + let serialized = serde_json::to_string(&server).expect("serialized ok"); + let deserialized = serde_json::from_str(&serialized).expect("deserialized ok"); + assert_eq!(server, deserialized); + } + + #[test] + fn test_ser_deser_server_time() { + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] + struct FaktoryServer { + /// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC"). + #[serde(deserialize_with = "deser_server_time")] + #[serde(serialize_with = "ser_server_time")] + pub server_utc_time: NaiveTime, + } + + let server = FaktoryServer { + server_utc_time: NaiveTime::from_hms_opt(19, 47, 39).expect("valid"), + }; + + let serialized = serde_json::to_string(&server).expect("serialized ok"); + assert_eq!(serialized, "{\"server_utc_time\":\"19:47:39 UTC\"}"); + + let deserialized = serde_json::from_str(&serialized).expect("deserialized ok"); + assert_eq!(server, deserialized); + } } diff --git a/tests/real/community.rs b/tests/real/community.rs index 6ed32d9f..eabf693f 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,4 +1,4 @@ -use crate::skip_check; +use crate::{assert_gte, skip_check}; use faktory::{Client, Job, JobBuilder, JobId, Worker, WorkerBuilder, WorkerId}; use serde_json::Value; use std::{io, sync}; @@ -70,6 +70,107 @@ async fn roundtrip() { assert!(drained); } +#[tokio::test(flavor = "multi_thread")] +async fn server_state() { + skip_check!(); + + let local = "server_state"; + + // prepare a worker + let mut w = WorkerBuilder::default() + .register_fn(local, move |_| async move { Ok::<(), io::Error>(()) }) + .connect(None) + .await + .unwrap(); + + // prepare a producing client + let mut client = Client::connect(None).await.unwrap(); + + // examine server state before pushing anything + let server_state = client.current_info().await.unwrap(); + // the Faktory release we are writing bindings and testing + // against is at least "1.8.0" + assert_eq!(server_state.server.version.major, 1); + assert_gte!(server_state.server.version.minor, 8); + assert!(server_state.data.queues.get(local).is_none()); + // the following two assertions are not super-helpful but + // there is not much info we can make meaningful assetions on anyhow + // (like memusage, server description string, version, etc.) + assert_gte!( + server_state.server.connections, + 2, + "{}", + server_state.server.connections + ); // at least two clients from the current test + assert_ne!(server_state.server.uptime.as_secs(), 0); // if IPC is happenning, this should hold :) + + let nenqueued = server_state.data.total_enqueued; + let nqueues = server_state.data.total_queues; + + // push 1 job + client + .enqueue( + JobBuilder::new(local) + .args(vec!["abc"]) + .queue(local) + .build(), + ) + .await + .unwrap(); + + // we only pushed 1 job on this queue + let server_state = client.current_info().await.unwrap(); + assert_eq!(*server_state.data.queues.get(local).unwrap(), 1); + // `total_enqueued` should be at least +1 job from from last read + assert_gte!( + server_state.data.total_enqueued, + nenqueued + 1, + "`total_enqueued` equals {} which is not greater than or equal to {}", + server_state.data.total_enqueued, + nenqueued + 1 + ); + // `total_queues` should be at least +1 queue from last read + assert_gte!( + server_state.data.total_queues, + nqueues + 1, + "`total_queues` equals {} which is not greater than or equal to {}", + server_state.data.total_queues, + nqueues + 1 + ); + + // let's know consume that job ... + assert!(w.run_one(0, &[local]).await.unwrap()); + + // ... and verify the queue has got 0 pending jobs + // + // NB! If this is not passing locally, make sure to launch a fresh Faktory container, + // because if you have not pruned its volume the Faktory will still keep the queue name + // as registered. + // But generally, we are performing a clean-up by consuming the jobs from the local queue/ + // and then deleting the queue programmatically, so there is normally no need to prune docker + // volumes to perform the next test run. Also note that on CI we are always starting a-fresh. + let server_state = client.current_info().await.unwrap(); + assert_eq!(*server_state.data.queues.get(local).unwrap(), 0); + // `total_processed` should be at least +1 queue from last read + assert_gte!( + server_state.data.total_processed, + 1, + "{}", + server_state.data.total_processed + ); + + // Uncomment when `Client::queue_remove` is delivered: + // client.queue_remove(&[local]).await.unwrap(); + // assert!(client + // .current_info() + // .await + // .unwrap() + // .data + // .queues + // .get(local) + // .is_none()); +} + #[tokio::test(flavor = "multi_thread")] async fn multi() { skip_check!(); diff --git a/tests/real/utils.rs b/tests/real/utils.rs index 9fefb92f..1ae08534 100644 --- a/tests/real/utils.rs +++ b/tests/real/utils.rs @@ -16,6 +16,34 @@ macro_rules! skip_if_not_enterprise { }; } +#[macro_export] +macro_rules! assert_gt { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a > $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_gte { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a >= $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_lt { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a < $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_lte { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a <= $b $(, $rest) *) + }; +} + #[cfg(feature = "ent")] pub fn learn_faktory_url() -> String { let url = std::env::var_os("FAKTORY_URL").expect(