Skip to content

Commit

Permalink
feat!: Implement health check API using HealthCheck trait
Browse files Browse the repository at this point in the history
- Use the `HealthCheck` trait to implement the `_health` API endpoint
and the health CLI command in addition to pre-boot checks.
- Create a `HealthCheckRegistry` and the `App#health_checks method to
allow consumers to provide their own health checks that will be run
along with the defualt ones.
- Remove the health checks from the `ServiceRegistry`.

Closes #241
  • Loading branch information
spencewenski committed Jul 1, 2024
1 parent b05d377 commit 8f3fcaf
Show file tree
Hide file tree
Showing 17 changed files with 565 additions and 341 deletions.
32 changes: 17 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,7 @@ and [Poem](https://github.com/poem-web/poem).
the `sidekiq` feature)
- Structured logs/traces using tokio's [tracing](https://docs.rs/tracing/latest/tracing/) crate. Export traces/metrics
using OpenTelemetry (requires the `otel` feature).

## Goals

- Currently, Roadster is focused on back-end API development with Rust. We leave it to the consumer to decide how they
prefer to add a front-end, e.g., using an established JS/TS
framework ([React](https://react.dev/) / [Next](https://nextjs.org/) / [Vue](https://vuejs.org/) / [Svelte](https://svelte.dev/)/ [Solid](https://www.solidjs.com/)
/ etc) or
using a Rust front-end
framework ([Leptos](https://github.com/leptos-rs/leptos) / [Yew](https://github.com/yewstack/yew) / [Perseus](https://github.com/framesurge/perseus/) / [Sycamore](https://github.com/sycamore-rs/sycamore)
/ etc).

## Future plans

- In the future, we may provide a more opinionated approach to front-end development. At a minimum we plan to add
examples of how to integrate and deploy various front-end frameworks with or along-side Roadster.
- Health checks to ensure the app's external dependencies are healthy

# Getting started

Expand Down Expand Up @@ -101,6 +87,22 @@ echo ROADSTER__ENVIRONMENT=development >> .env
cargo run
```

# Add a UI

Currently, Roadster is focused on back-end API development with Rust. We leave it to the consumer to decide how they
prefer to add a front-end, e.g., using an established JS/TS
framework ([React](https://react.dev/) / [Next](https://nextjs.org/) / [Vue](https://vuejs.org/) / [Svelte](https://svelte.dev/) / [Solid](https://www.solidjs.com/)
/ etc) or
using a Rust front-end
framework ([Leptos](https://github.com/leptos-rs/leptos) / [Yew](https://github.com/yewstack/yew) / [Perseus](https://github.com/framesurge/perseus/) / [Sycamore](https://github.com/sycamore-rs/sycamore)
/ etc). That said, we do have some examples of how to Roadster along with some these frameworks.

## Examples

| Framework | Example |
|-----------------------------------------------|-------------------------------------------------------------------------------------|
| [Leptos](https://github.com/leptos-rs/leptos) | [leptos-ssr](https://github.com/roadster-rs/roadster/tree/main/examples/leptos-ssr) |

# Tracing + OpenTelemetry

Roadster allows reporting traces and metrics using the `tracing` and `opentelemetry_rust` integrations. Provide the URL
Expand Down
10 changes: 8 additions & 2 deletions src/api/cli/roadster/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ use async_trait::async_trait;
use axum::extract::FromRef;
use clap::Parser;
use serde_derive::Serialize;
use std::time::Duration;
use tracing::info;

#[derive(Debug, Parser, Serialize)]
#[non_exhaustive]
pub struct HealthArgs {}
pub struct HealthArgs {
/// Maximum time to spend checking the health of the resources in milliseconds
#[clap(short = 'd', long)]
max_duration: Option<u64>,
}

#[async_trait]
impl<A, S> RunRoadsterCommand<A, S> for HealthArgs
Expand All @@ -26,7 +31,8 @@ where
_cli: &RoadsterCli,
#[allow(unused_variables)] state: &S,
) -> RoadsterResult<bool> {
let health = health_check(state).await?;
let duration = Duration::from_millis(self.max_duration.unwrap_or(10000));
let health = health_check(state, Some(duration)).await?;
let health = serde_json::to_string_pretty(&health)?;
info!("\n{health}");
Ok(true)
Expand Down
196 changes: 88 additions & 108 deletions src/api/core/health.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use crate::app::context::AppContext;
use crate::error::RoadsterResult;
use crate::health_check::{CheckResponse, ErrorData, HealthCheck, Status};
#[cfg(feature = "open-api")]
use aide::OperationIo;
#[cfg(feature = "sidekiq")]
use anyhow::anyhow;
use axum::extract::FromRef;
use futures::future::join_all;
#[cfg(feature = "open-api")]
use schemars::JsonSchema;
#[cfg(feature = "db-sql")]
Expand All @@ -11,120 +15,110 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, skip_serializing_none};
#[cfg(feature = "sidekiq")]
use sidekiq::redis_rs::cmd;
#[cfg(any(feature = "db-sql", feature = "sidekiq"))]
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[cfg(any(feature = "db-sql", feature = "sidekiq"))]
use tokio::time::timeout;
use tracing::instrument;
use tracing::{info, instrument};

#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[cfg_attr(feature = "open-api", derive(JsonSchema, OperationIo))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct HeathCheckResponse {
/// Total latency of checking the health of the app.
pub latency: u128,
#[cfg(feature = "db-sql")]
pub db: ResourceHealth,
/// Health of the Redis connection used to enqueue Sidekiq jobs.
#[cfg(feature = "sidekiq")]
pub redis_enqueue: ResourceHealth,
/// Health of the Redis connection used to fetch Sidekiq jobs.
#[cfg(feature = "sidekiq")]
pub redis_fetch: Option<ResourceHealth>,
}

#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ResourceHealth {
pub status: Status,
/// How long it takes to acquire a connection from the pool.
pub acquire_conn_latency: Option<u128>,
/// How long it takes to ping the resource after the connection is acquired.
pub ping_latency: Option<u128>,
/// Total latency of checking the health of the resource.
pub latency: u128,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum Status {
Ok,
Err(ErrorData),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ErrorData {
#[serde(skip_serializing_if = "Option::is_none")]
pub msg: Option<String>,
#[serde(flatten)]
pub resources: BTreeMap<String, CheckResponse>,
}

#[instrument(skip_all)]
pub async fn health_check<S>(
#[allow(unused_variables)] state: &S,
state: &S,
duration: Option<Duration>,
) -> RoadsterResult<HeathCheckResponse>
where
S: Clone + Send + Sync + 'static,
AppContext: FromRef<S>,
{
#[allow(unused_variables)]
let state = AppContext::from_ref(state);
if let Some(duration) = duration.as_ref() {
info!(
"Running checks for a maximum duration of {} ms",
duration.as_millis()
);
} else {
info!("Running checks");
}
let context = AppContext::from_ref(state);
let timer = Instant::now();

#[cfg(any(feature = "db-sql", feature = "sidekiq"))]
let timeout_duration = Some(Duration::from_secs(1));

#[cfg(all(feature = "db-sql", feature = "sidekiq"))]
let (db, (redis_enqueue, redis_fetch)) = tokio::join!(
db_health(&state, timeout_duration),
all_sidekiq_redis_health(&state, timeout_duration)
);

#[cfg(all(feature = "db-sql", not(feature = "sidekiq")))]
let db = db_health(&state, timeout_duration).await;

#[cfg(all(not(feature = "db-sql"), feature = "sidekiq"))]
let (redis_enqueue, redis_fetch) = all_sidekiq_redis_health(&state, timeout_duration).await;
let check_futures = context.health_checks().checks()?.into_iter().map(|check| {
Box::pin(async move {
let name = check.name();
info!(name=%name, "Running check");
let check_timer = Instant::now();
let result = match run_check(check, duration).await {
Ok(response) => response,
Err(err) => CheckResponse::builder()
.status(Status::Err(
ErrorData::builder()
.msg(format!(
"An error occurred while running health check `{name}`: {err}"
))
.build(),
))
.latency(check_timer.elapsed())
.build(),
};
(name, result)
})
});

let resources = join_all(check_futures).await.into_iter().collect();

Ok(HeathCheckResponse {
latency: timer.elapsed().as_millis(),
#[cfg(feature = "db-sql")]
db,
#[cfg(feature = "sidekiq")]
redis_enqueue,
#[cfg(feature = "sidekiq")]
redis_fetch,
resources,
})
}

async fn run_check(
check: Arc<dyn HealthCheck>,
duration: Option<Duration>,
) -> RoadsterResult<CheckResponse> {
if let Some(duration) = duration {
timeout(duration, check.check()).await?
} else {
check.check().await
}
}

#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "open-api", derive(JsonSchema))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct Latency {
/// How long it takes to acquire a connection from the pool.
pub acquire_conn_latency: Option<u128>,
/// How long it takes to ping the resource after the connection is acquired.
pub ping_latency: Option<u128>,
}

#[cfg(feature = "db-sql")]
pub(crate) async fn db_health(context: &AppContext, duration: Option<Duration>) -> ResourceHealth {
pub(crate) async fn db_health(context: &AppContext, duration: Option<Duration>) -> CheckResponse {
let db_timer = Instant::now();
let db_status = match ping_db(context.db(), duration).await {
Ok(_) => Status::Ok,
Err(err) => Status::Err(ErrorData {
msg: Some(err.to_string()),
}),
Err(err) => Status::Err(ErrorData::builder().msg(err.to_string()).build()),
};
let db_timer = db_timer.elapsed();
ResourceHealth {
status: db_status,
acquire_conn_latency: None,
ping_latency: None,
latency: db_timer.as_millis(),
}
CheckResponse::builder()
.status(db_status)
.latency(db_timer)
.build()
}

#[cfg(feature = "db-sql")]
Expand All @@ -138,45 +132,31 @@ async fn ping_db(db: &DatabaseConnection, duration: Option<Duration>) -> Roadste
Ok(())
}

#[cfg(feature = "sidekiq")]
pub(crate) async fn all_sidekiq_redis_health(
context: &AppContext,
duration: Option<Duration>,
) -> (ResourceHealth, Option<ResourceHealth>) {
{
let redis_enqueue = redis_health(context.redis_enqueue(), duration);
if let Some(redis_fetch) = context.redis_fetch() {
let (redis_enqueue, redis_fetch) =
tokio::join!(redis_enqueue, redis_health(redis_fetch, duration));
(redis_enqueue, Some(redis_fetch))
} else {
(redis_enqueue.await, None)
}
}
}

#[cfg(feature = "sidekiq")]
#[instrument(skip_all)]
async fn redis_health(redis: &sidekiq::RedisPool, duration: Option<Duration>) -> ResourceHealth {
pub(crate) async fn redis_health(
redis: &sidekiq::RedisPool,
duration: Option<Duration>,
) -> CheckResponse {
let redis_timer = Instant::now();
let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis, duration).await
{
Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())),
Err(err) => (
Status::Err(ErrorData {
msg: Some(err.to_string()),
}),
Status::Err(ErrorData::builder().msg(err.to_string()).build()),
None,
None,
),
};
let redis_timer = redis_timer.elapsed();
ResourceHealth {
status: redis_status,
acquire_conn_latency,
ping_latency,
latency: redis_timer.as_millis(),
}
CheckResponse::builder()
.status(redis_status)
.latency(redis_timer)
.custom(Latency {
acquire_conn_latency,
ping_latency,
})
.build()
}

#[cfg(feature = "sidekiq")]
Expand Down
Loading

0 comments on commit 8f3fcaf

Please sign in to comment.