Skip to content

Commit

Permalink
Implement the health check API as a protocol agnostic core module
Browse files Browse the repository at this point in the history
Move the underlying implementation of the `_health` HTTP API to the
`core` module and update the HTTP method to call the core impl. Also,
add support for invoking the health check API from the CLI using the
same underlying core impl.

Closes #170
  • Loading branch information
spencewenski committed Jun 11, 2024
1 parent 4196c0a commit 93237b1
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 157 deletions.
31 changes: 31 additions & 0 deletions src/api/cli/roadster/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::api::cli::roadster::{RoadsterCli, RunRoadsterCommand};
use crate::api::core::health::health_check;
use crate::app::context::AppContext;
use crate::app::App;
use crate::error::RoadsterResult;
use async_trait::async_trait;
use clap::Parser;
use serde_derive::Serialize;
use tracing::info;

#[derive(Debug, Parser, Serialize)]
#[non_exhaustive]
pub struct HealthArgs {}

#[async_trait]
impl<A> RunRoadsterCommand<A> for HealthArgs
where
A: App,
{
async fn run(
&self,
_app: &A,
_cli: &RoadsterCli,
context: &AppContext<A::State>,
) -> RoadsterResult<bool> {
let health = health_check(context).await?;
let health = serde_json::to_string_pretty(&health)?;
info!("\n{health}");
Ok(true)
}
}
7 changes: 7 additions & 0 deletions src/api/cli/roadster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::api::cli::roadster::health::HealthArgs;
#[cfg(feature = "open-api")]
use crate::api::cli::roadster::list_routes::ListRoutesArgs;
#[cfg(feature = "db-sql")]
Expand All @@ -13,6 +14,7 @@ use async_trait::async_trait;
use clap::{Parser, Subcommand};
use serde_derive::Serialize;

pub mod health;
#[cfg(feature = "open-api")]
pub mod list_routes;
#[cfg(feature = "db-sql")]
Expand Down Expand Up @@ -164,6 +166,7 @@ where
#[cfg(feature = "db-sql")]
RoadsterSubCommand::Migrate(args) => args.run(app, cli, context).await,
RoadsterSubCommand::PrintConfig(args) => args.run(app, cli, context).await,
RoadsterSubCommand::Health(args) => args.run(app, cli, context).await,
}
}
}
Expand All @@ -189,4 +192,8 @@ pub enum RoadsterSubCommand {

/// Print the AppConfig
PrintConfig(PrintConfigArgs),

/// Check the health of the app's resources. Note: This runs without starting the app's service(s)
/// and only requires creating the AppContext<S> that would normally be used by the app.
Health(HealthArgs),
}
172 changes: 172 additions & 0 deletions src/api/core/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use crate::app::context::AppContext;
use crate::error::RoadsterResult;
#[cfg(feature = "sidekiq")]
use anyhow::anyhow;
#[cfg(feature = "open-api")]
use schemars::JsonSchema;
#[cfg(feature = "db-sql")]
use sea_orm::DatabaseConnection;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, skip_serializing_none};
#[cfg(feature = "sidekiq")]
use sidekiq::redis_rs::cmd;
#[cfg(feature = "sidekiq")]
use std::time::Duration;
use std::time::Instant;
#[cfg(feature = "sidekiq")]
use tokio::time::timeout;
use tracing::instrument;

#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct HeathCheckResponse {
/// Total latency of checking the health of the app.
pub latency: u128,
#[cfg(feature = "db-sql")]
pub db: ResourceHealth,
/// Health of the Redis connection used to enqueue Sidekiq jobs.
#[cfg(feature = "sidekiq")]
pub redis_enqueue: ResourceHealth,
/// Health of the Redis connection used to fetch Sidekiq jobs.
#[cfg(feature = "sidekiq")]
pub redis_fetch: Option<ResourceHealth>,
}

#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ResourceHealth {
pub status: Status,
/// How long it takes to acquire a connection from the pool.
pub acquire_conn_latency: Option<u128>,
/// How long it takes to ping the resource after the connection is acquired.
pub ping_latency: Option<u128>,
/// Total latency of checking the health of the resource.
pub latency: u128,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum Status {
Ok,
Err(ErrorData),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ErrorData {
#[serde(skip_serializing_if = "Option::is_none")]
pub msg: Option<String>,
}

#[instrument(skip_all)]
pub async fn health_check<S>(
#[allow(unused_variables)] state: &AppContext<S>,
) -> RoadsterResult<HeathCheckResponse>
where
S: Clone + Send + Sync + 'static,
{
let timer = Instant::now();
#[cfg(any(feature = "sidekiq", feature = "db-sql"))]
#[cfg(feature = "db-sql")]
let db = {
let db_timer = Instant::now();
let db_status = if ping_db(state.db()).await.is_ok() {
Status::Ok
} else {
Status::Err(ErrorData { msg: None })
};
let db_timer = db_timer.elapsed();
ResourceHealth {
status: db_status,
acquire_conn_latency: None,
ping_latency: None,
latency: db_timer.as_millis(),
}
};

#[cfg(feature = "sidekiq")]
let (redis_enqueue, redis_fetch) = {
let redis_enqueue = redis_health(state.redis_enqueue());
if let Some(redis_fetch) = state.redis_fetch() {
let (redis_enqueue, redis_fetch) =
tokio::join!(redis_enqueue, redis_health(redis_fetch));
(redis_enqueue, Some(redis_fetch))
} else {
(redis_enqueue.await, None)
}
};

Ok(HeathCheckResponse {
latency: timer.elapsed().as_millis(),
#[cfg(feature = "db-sql")]
db,
#[cfg(feature = "sidekiq")]
redis_enqueue,
#[cfg(feature = "sidekiq")]
redis_fetch,
})
}

#[cfg(feature = "db-sql")]
#[instrument(skip_all)]
async fn ping_db(db: &DatabaseConnection) -> RoadsterResult<()> {
db.ping().await?;
Ok(())
}

#[cfg(feature = "sidekiq")]
#[instrument(skip_all)]
async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth {
let redis_timer = Instant::now();
let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis).await {
Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())),
Err(err) => (
Status::Err(ErrorData {
msg: Some(err.to_string()),
}),
None,
None,
),
};
let redis_timer = redis_timer.elapsed();
ResourceHealth {
status: redis_status,
acquire_conn_latency,
ping_latency,
latency: redis_timer.as_millis(),
}
}

#[cfg(feature = "sidekiq")]
#[instrument(skip_all)]
async fn ping_redis(redis: &sidekiq::RedisPool) -> RoadsterResult<(Duration, Duration)> {
let timer = Instant::now();
let mut conn = timeout(Duration::from_secs(1), redis.get()).await??;
let acquire_conn_latency = timer.elapsed();

let timer = Instant::now();
let msg = uuid::Uuid::new_v4().to_string();
let pong: String = cmd("PING")
.arg(&msg)
.query_async(conn.unnamespaced_borrow_mut())
.await?;
let ping_latency = timer.elapsed();

if pong == msg {
Ok((acquire_conn_latency, ping_latency))
} else {
Err(anyhow!("Ping response does not match input.").into())
}
}
3 changes: 3 additions & 0 deletions src/api/core/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Internal, protocol agnostic API implementations. The APIs/methods provided here provide the
//! underlying implementation for the HTTP, gPRC, CLI, etc APIs.
pub mod health;
Loading

0 comments on commit 93237b1

Please sign in to comment.