Skip to content

Commit

Permalink
Add ServerState and Client::current_info (#63)
Browse files Browse the repository at this point in the history
Closes #60
  • Loading branch information
rustworthy authored May 25, 2024
1 parent eac9545 commit 58962df
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 16 deletions.
18 changes: 14 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { version = "1.35.1", features = [
tokio-native-tls = { version = "0.3.1", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
url = "2"
semver = { version = "1.0.23", features = ["serde"] }

[dev-dependencies]
rustls-pki-types = "1.0.1"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ faktory/tls:

.PHONY: faktory/tls/kill
faktory/tls/kill:
docker compose -f docker/compose.yml down
docker compose -f docker/compose.yml down -v

.PHONY: test
test:
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ mod proto;
mod worker;

pub use crate::error::Error;
pub use crate::proto::{Client, Job, JobBuilder, JobId, Reconnect, WorkerId};
pub use crate::proto::{
Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
Expand Down
4 changes: 2 additions & 2 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ where
Ok((jobs_count - errors.len(), Some(errors)))
}

/// Retrieve information about the running server.
/// Retrieve [information](crate::ServerSnapshot) about the running server.
///
/// The returned value is the result of running the `INFO` command on the server.
pub async fn info(&mut self) -> Result<serde_json::Value, Error> {
pub async fn current_info(&mut self) -> Result<single::FaktoryState, Error> {
self.issue(&Info)
.await?
.read_json()
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSIO

mod single;

pub use single::{Job, JobBuilder, JobId, WorkerId};
pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};

Expand Down
4 changes: 2 additions & 2 deletions src/proto/single/ent/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::proto::single::JobId;

use super::utils;
use crate::proto::single::JobId;
use chrono::{DateTime, Utc};
use derive_builder::Builder;

/// Info on job execution progress (sent).
///
/// In Enterprise Faktory, a client executing a job can report on the execution
Expand Down
112 changes: 108 additions & 4 deletions src/proto/single/resp.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use super::utils;
use crate::error::{self, Error};
use chrono::{DateTime, Utc};
use std::collections::BTreeMap;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};

#[cfg(feature = "ent")]
use crate::ent::BatchId;

use crate::error::{self, Error};
use tokio::io::AsyncBufRead;

pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol {
let stringy = match *got {
RawResponse::String(ref s) => Some(&**s),
Expand Down Expand Up @@ -118,6 +122,107 @@ pub async fn read_ok<R: AsyncBufRead + Unpin>(r: R) -> Result<(), Error> {
Err(bad("server ok", &rr).into())
}

// ----------------------------------------------

/// Faktory service information.
///
/// This holds information on the registered [queues](DataSnapshot::queues) as well as
/// some aggregated data, e.g. total number of jobs [processed](DataSnapshot::total_processed),
/// total number of jobs [enqueued](DataSnapshot::total_enqueued), etc.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[non_exhaustive]
pub struct DataSnapshot {
/// Total number of job failures.
pub total_failures: u64,

/// Total number of processed jobs.
pub total_processed: u64,

/// Total number of enqueued jobs.
pub total_enqueued: u64,

/// Total number of queues.
pub total_queues: u64,

/// Queues stats.
///
/// A mapping between a queue name and its size (number of jobs on the queue).
/// The keys of this map effectively make up a list of queues that are currently
/// registered in the Faktory service.
pub queues: BTreeMap<String, u64>,

/// ***Deprecated***. Faktory's task runner stats.
///
/// Note that this is exposed as a "generic" `serde_json::Value` since this info
/// belongs to deep implementation details of the Faktory service.
#[deprecated(
note = "marked as deprecated in the Faktory source code and is likely to be completely removed in the future, so please do not rely on this data"
)]
pub tasks: serde_json::Value,
}

/// Faktory's server process information.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServerSnapshot {
/// Description of the server process (e.g. "Faktory").
pub description: String,

/// Faktory's version as semver.
#[serde(rename = "faktory_version")]
pub version: semver::Version,

/// Faktory server process uptime in seconds.
#[serde(deserialize_with = "utils::deser_duration")]
#[serde(serialize_with = "utils::ser_duration")]
pub uptime: Duration,

/// Number of clients connected to the server.
pub connections: u64,

/// Number of executed commands.
pub command_count: u64,

/// Faktory server process memory usage.
pub used_memory_mb: u64,
}

/// Current server state.
///
/// Contains such details as how many queues there are on the server, statistics on the jobs,
/// as well as some specific info on server process memory usage, uptime, etc.
///
/// Here is an example of the simplest way to fetch info on the server state.
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
///
/// let mut client = Client::connect(None).await.unwrap();
/// let _server_state = client.current_info().await.unwrap();
/// # });
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FaktoryState {
/// Server time.
pub now: DateTime<Utc>,

/// Server time (naive representation).
///
/// Faktory sends it as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC")
/// and it is being parsed as `NaiveTime`.
///
/// Most of the time, though, you will want to use [`FaktoryState::now`] instead.
#[serde(deserialize_with = "utils::deser_server_time")]
#[serde(serialize_with = "utils::ser_server_time")]
pub server_utc_time: chrono::naive::NaiveTime,

/// Faktory service information.
#[serde(rename = "faktory")]
pub data: DataSnapshot,

/// Faktory's server process information.
pub server: ServerSnapshot,
}

// ----------------------------------------------
//
// below is the implementation of the Redis RESP protocol
Expand All @@ -132,7 +237,6 @@ pub enum RawResponse {
Null,
}

use tokio::io::{AsyncBufReadExt, AsyncReadExt};
async fn read<R>(mut r: R) -> Result<RawResponse, Error>
where
R: AsyncBufRead + Unpin,
Expand Down
80 changes: 80 additions & 0 deletions src/proto/single/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use chrono::naive::NaiveTime;
use rand::{thread_rng, Rng};
use serde::{de::Deserializer, Deserialize, Serializer};
use std::time::Duration;

const JOB_ID_LENGTH: usize = 16;
const WORKER_ID_LENGTH: usize = 32;
Expand All @@ -19,6 +22,44 @@ pub(crate) fn gen_random_wid() -> String {
gen_random_id(WORKER_ID_LENGTH)
}

pub(crate) fn ser_duration<S>(value: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let secs = value.as_secs();
serializer.serialize_u64(secs)
}

pub(crate) fn deser_duration<'de, D>(value: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(value)?;
Ok(Duration::from_secs(secs))
}

pub(crate) fn ser_server_time<S>(value: &NaiveTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{} UTC", value))
}

pub(crate) fn deser_server_time<'de, D>(value: D) -> Result<NaiveTime, D::Error>
where
D: Deserializer<'de>,
{
let naive_time_str = String::deserialize(value)?;
let naive_time_str = naive_time_str
.strip_suffix(" UTC")
.ok_or(serde::de::Error::custom(
"Expected a naive time string that ends with ' UTC'",
))?;
let naive_time =
NaiveTime::parse_from_str(naive_time_str, "%H:%M:%S").map_err(serde::de::Error::custom)?;
Ok(naive_time)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -49,4 +90,43 @@ mod test {
}
assert_eq!(ids.len(), 1_000_000);
}

#[test]
fn test_ser_deser_duration() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct FaktoryServer {
#[serde(deserialize_with = "deser_duration")]
#[serde(serialize_with = "ser_duration")]
uptime: Duration,
}

let server = FaktoryServer {
uptime: Duration::from_secs(2024),
};

let serialized = serde_json::to_string(&server).expect("serialized ok");
let deserialized = serde_json::from_str(&serialized).expect("deserialized ok");
assert_eq!(server, deserialized);
}

#[test]
fn test_ser_deser_server_time() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct FaktoryServer {
/// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC").
#[serde(deserialize_with = "deser_server_time")]
#[serde(serialize_with = "ser_server_time")]
pub server_utc_time: NaiveTime,
}

let server = FaktoryServer {
server_utc_time: NaiveTime::from_hms_opt(19, 47, 39).expect("valid"),
};

let serialized = serde_json::to_string(&server).expect("serialized ok");
assert_eq!(serialized, "{\"server_utc_time\":\"19:47:39 UTC\"}");

let deserialized = serde_json::from_str(&serialized).expect("deserialized ok");
assert_eq!(server, deserialized);
}
}
Loading

0 comments on commit 58962df

Please sign in to comment.