From 822564731bab5eac43d93b33796d99978121022f Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Mon, 1 Jul 2024 03:31:06 -0700 Subject: [PATCH] feat!: Implement health check API using `HealthCheck` trait (#255) - Use the `HealthCheck` trait to implement the `_health` API endpoint and the health CLI command in addition to pre-boot checks. - Create a `HealthCheckRegistry` and the `App#health_checks method to allow consumers to provide their own health checks that will be run along with the defualt ones. - Remove the health checks from the `ServiceRegistry`. Closes https://github.com/roadster-rs/roadster/issues/241 --- README.md | 32 +-- src/api/cli/roadster/health.rs | 10 +- src/api/core/health.rs | 196 ++++++++---------- src/api/http/health.rs | 73 ++++--- src/app/context.rs | 11 + src/app/mod.rs | 22 +- src/health_check/database.rs | 32 +-- src/health_check/default.rs | 38 ++-- src/health_check/mod.rs | 74 +++++-- src/health_check/registry.rs | 112 ++++++++++ src/health_check/sidekiq.rs | 86 -------- src/health_check/sidekiq_enqueue.rs | 62 ++++++ src/health_check/sidekiq_fetch.rs | 83 ++++++++ ...ult__tests__default_middleware@case_1.snap | 6 +- ...ult__tests__default_middleware@case_2.snap | 3 +- src/service/registry.rs | 30 --- src/service/runner.rs | 37 ++-- 17 files changed, 566 insertions(+), 341 deletions(-) create mode 100644 src/health_check/registry.rs delete mode 100644 src/health_check/sidekiq.rs create mode 100644 src/health_check/sidekiq_enqueue.rs create mode 100644 src/health_check/sidekiq_fetch.rs diff --git a/README.md b/README.md index eb28b779..fdcb0ba6 100644 --- a/README.md +++ b/README.md @@ -40,21 +40,7 @@ and [Poem](https://github.com/poem-web/poem). the `sidekiq` feature) - Structured logs/traces using tokio's [tracing](https://docs.rs/tracing/latest/tracing/) crate. Export traces/metrics using OpenTelemetry (requires the `otel` feature). - -## Goals - -- Currently, Roadster is focused on back-end API development with Rust. We leave it to the consumer to decide how they - prefer to add a front-end, e.g., using an established JS/TS - framework ([React](https://react.dev/) / [Next](https://nextjs.org/) / [Vue](https://vuejs.org/) / [Svelte](https://svelte.dev/)/ [Solid](https://www.solidjs.com/) - / etc) or - using a Rust front-end - framework ([Leptos](https://github.com/leptos-rs/leptos) / [Yew](https://github.com/yewstack/yew) / [Perseus](https://github.com/framesurge/perseus/) / [Sycamore](https://github.com/sycamore-rs/sycamore) - / etc). - -## Future plans - -- In the future, we may provide a more opinionated approach to front-end development. At a minimum we plan to add - examples of how to integrate and deploy various front-end frameworks with or along-side Roadster. +- Health checks to ensure the app's external dependencies are healthy # Getting started @@ -101,6 +87,22 @@ echo ROADSTER__ENVIRONMENT=development >> .env cargo run ``` +# Add a UI + +Currently, Roadster is focused on back-end API development with Rust. We leave it to the consumer to decide how they +prefer to add a front-end, e.g., using an established JS/TS +framework ([React](https://react.dev/) / [Next](https://nextjs.org/) / [Vue](https://vuejs.org/) / [Svelte](https://svelte.dev/) / [Solid](https://www.solidjs.com/) +/ etc) or +using a Rust front-end +framework ([Leptos](https://github.com/leptos-rs/leptos) / [Yew](https://github.com/yewstack/yew) / [Perseus](https://github.com/framesurge/perseus/) / [Sycamore](https://github.com/sycamore-rs/sycamore) +/ etc). That said, we do have some examples of how to Roadster along with some these frameworks. + +## Examples + +| Framework | Example | +|-----------------------------------------------|-------------------------------------------------------------------------------------| +| [Leptos](https://github.com/leptos-rs/leptos) | [leptos-ssr](https://github.com/roadster-rs/roadster/tree/main/examples/leptos-ssr) | + # Tracing + OpenTelemetry Roadster allows reporting traces and metrics using the `tracing` and `opentelemetry_rust` integrations. Provide the URL diff --git a/src/api/cli/roadster/health.rs b/src/api/cli/roadster/health.rs index db1e9f91..7c15cc4e 100644 --- a/src/api/cli/roadster/health.rs +++ b/src/api/cli/roadster/health.rs @@ -7,11 +7,16 @@ use async_trait::async_trait; use axum::extract::FromRef; use clap::Parser; use serde_derive::Serialize; +use std::time::Duration; use tracing::info; #[derive(Debug, Parser, Serialize)] #[non_exhaustive] -pub struct HealthArgs {} +pub struct HealthArgs { + /// Maximum time to spend checking the health of the resources in milliseconds + #[clap(short = 'd', long)] + max_duration: Option, +} #[async_trait] impl RunRoadsterCommand for HealthArgs @@ -26,7 +31,8 @@ where _cli: &RoadsterCli, #[allow(unused_variables)] state: &S, ) -> RoadsterResult { - let health = health_check(state).await?; + let duration = Duration::from_millis(self.max_duration.unwrap_or(10000)); + let health = health_check(state, Some(duration)).await?; let health = serde_json::to_string_pretty(&health)?; info!("\n{health}"); Ok(true) diff --git a/src/api/core/health.rs b/src/api/core/health.rs index 40e63b51..6f8158cb 100644 --- a/src/api/core/health.rs +++ b/src/api/core/health.rs @@ -1,8 +1,12 @@ use crate::app::context::AppContext; use crate::error::RoadsterResult; +use crate::health_check::{CheckResponse, ErrorData, HealthCheck, Status}; +#[cfg(feature = "open-api")] +use aide::OperationIo; #[cfg(feature = "sidekiq")] use anyhow::anyhow; use axum::extract::FromRef; +use futures::future::join_all; #[cfg(feature = "open-api")] use schemars::JsonSchema; #[cfg(feature = "db-sql")] @@ -11,120 +15,110 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, skip_serializing_none}; #[cfg(feature = "sidekiq")] use sidekiq::redis_rs::cmd; -#[cfg(any(feature = "db-sql", feature = "sidekiq"))] +use std::collections::BTreeMap; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; -#[cfg(any(feature = "db-sql", feature = "sidekiq"))] use tokio::time::timeout; -use tracing::instrument; +use tracing::{info, instrument}; -#[serde_as] -#[skip_serializing_none] #[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[cfg_attr(feature = "open-api", derive(JsonSchema, OperationIo))] #[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct HeathCheckResponse { /// Total latency of checking the health of the app. pub latency: u128, - #[cfg(feature = "db-sql")] - pub db: ResourceHealth, - /// Health of the Redis connection used to enqueue Sidekiq jobs. - #[cfg(feature = "sidekiq")] - pub redis_enqueue: ResourceHealth, - /// Health of the Redis connection used to fetch Sidekiq jobs. - #[cfg(feature = "sidekiq")] - pub redis_fetch: Option, -} - -#[serde_as] -#[skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "open-api", derive(JsonSchema))] -#[serde(rename_all = "camelCase")] -#[non_exhaustive] -pub struct ResourceHealth { - pub status: Status, - /// How long it takes to acquire a connection from the pool. - pub acquire_conn_latency: Option, - /// How long it takes to ping the resource after the connection is acquired. - pub ping_latency: Option, - /// Total latency of checking the health of the resource. - pub latency: u128, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "open-api", derive(JsonSchema))] -#[serde(rename_all = "camelCase")] -#[non_exhaustive] -pub enum Status { - Ok, - Err(ErrorData), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "open-api", derive(JsonSchema))] -#[serde(rename_all = "camelCase")] -#[non_exhaustive] -pub struct ErrorData { - #[serde(skip_serializing_if = "Option::is_none")] - pub msg: Option, + #[serde(flatten)] + pub resources: BTreeMap, } #[instrument(skip_all)] pub async fn health_check( - #[allow(unused_variables)] state: &S, + state: &S, + duration: Option, ) -> RoadsterResult where S: Clone + Send + Sync + 'static, AppContext: FromRef, { - #[allow(unused_variables)] - let state = AppContext::from_ref(state); + if let Some(duration) = duration.as_ref() { + info!( + "Running checks for a maximum duration of {} ms", + duration.as_millis() + ); + } else { + info!("Running checks"); + } + let context = AppContext::from_ref(state); let timer = Instant::now(); - #[cfg(any(feature = "db-sql", feature = "sidekiq"))] - let timeout_duration = Some(Duration::from_secs(1)); - - #[cfg(all(feature = "db-sql", feature = "sidekiq"))] - let (db, (redis_enqueue, redis_fetch)) = tokio::join!( - db_health(&state, timeout_duration), - all_sidekiq_redis_health(&state, timeout_duration) - ); - - #[cfg(all(feature = "db-sql", not(feature = "sidekiq")))] - let db = db_health(&state, timeout_duration).await; - - #[cfg(all(not(feature = "db-sql"), feature = "sidekiq"))] - let (redis_enqueue, redis_fetch) = all_sidekiq_redis_health(&state, timeout_duration).await; + let check_futures = context.health_checks().checks()?.into_iter().map(|check| { + Box::pin(async move { + let name = check.name(); + info!(name=%name, "Running check"); + let check_timer = Instant::now(); + let result = match run_check(check, duration).await { + Ok(response) => response, + Err(err) => CheckResponse::builder() + .status(Status::Err( + ErrorData::builder() + .msg(format!( + "An error occurred while running health check `{name}`: {err}" + )) + .build(), + )) + .latency(check_timer.elapsed()) + .build(), + }; + (name, result) + }) + }); + + let resources = join_all(check_futures).await.into_iter().collect(); Ok(HeathCheckResponse { latency: timer.elapsed().as_millis(), - #[cfg(feature = "db-sql")] - db, - #[cfg(feature = "sidekiq")] - redis_enqueue, - #[cfg(feature = "sidekiq")] - redis_fetch, + resources, }) } +async fn run_check( + check: Arc, + duration: Option, +) -> RoadsterResult { + if let Some(duration) = duration { + timeout(duration, check.check()).await? + } else { + check.check().await + } +} + +#[serde_as] +#[skip_serializing_none] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct Latency { + /// How long it takes to acquire a connection from the pool. + pub acquire_conn_latency: Option, + /// How long it takes to ping the resource after the connection is acquired. + pub ping_latency: Option, +} + #[cfg(feature = "db-sql")] -pub(crate) async fn db_health(context: &AppContext, duration: Option) -> ResourceHealth { +pub(crate) async fn db_health(context: &AppContext, duration: Option) -> CheckResponse { let db_timer = Instant::now(); let db_status = match ping_db(context.db(), duration).await { Ok(_) => Status::Ok, - Err(err) => Status::Err(ErrorData { - msg: Some(err.to_string()), - }), + Err(err) => Status::Err(ErrorData::builder().msg(err.to_string()).build()), }; let db_timer = db_timer.elapsed(); - ResourceHealth { - status: db_status, - acquire_conn_latency: None, - ping_latency: None, - latency: db_timer.as_millis(), - } + CheckResponse::builder() + .status(db_status) + .latency(db_timer) + .build() } #[cfg(feature = "db-sql")] @@ -138,45 +132,31 @@ async fn ping_db(db: &DatabaseConnection, duration: Option) -> Roadste Ok(()) } -#[cfg(feature = "sidekiq")] -pub(crate) async fn all_sidekiq_redis_health( - context: &AppContext, - duration: Option, -) -> (ResourceHealth, Option) { - { - let redis_enqueue = redis_health(context.redis_enqueue(), duration); - if let Some(redis_fetch) = context.redis_fetch() { - let (redis_enqueue, redis_fetch) = - tokio::join!(redis_enqueue, redis_health(redis_fetch, duration)); - (redis_enqueue, Some(redis_fetch)) - } else { - (redis_enqueue.await, None) - } - } -} - #[cfg(feature = "sidekiq")] #[instrument(skip_all)] -async fn redis_health(redis: &sidekiq::RedisPool, duration: Option) -> ResourceHealth { +pub(crate) async fn redis_health( + redis: &sidekiq::RedisPool, + duration: Option, +) -> CheckResponse { let redis_timer = Instant::now(); let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis, duration).await { Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())), Err(err) => ( - Status::Err(ErrorData { - msg: Some(err.to_string()), - }), + Status::Err(ErrorData::builder().msg(err.to_string()).build()), None, None, ), }; let redis_timer = redis_timer.elapsed(); - ResourceHealth { - status: redis_status, - acquire_conn_latency, - ping_latency, - latency: redis_timer.as_millis(), - } + CheckResponse::builder() + .status(redis_status) + .latency(redis_timer) + .custom(Latency { + acquire_conn_latency, + ping_latency, + }) + .build() } #[cfg(feature = "sidekiq")] diff --git a/src/api/http/health.rs b/src/api/http/health.rs index 67e3bcbe..2f8d871f 100644 --- a/src/api/http/health.rs +++ b/src/api/http/health.rs @@ -1,19 +1,23 @@ use crate::api::core::health::{health_check, HeathCheckResponse}; -#[cfg(all(feature = "open-api", any(feature = "db-sql", feature = "sidekiq")))] -use crate::api::core::health::{ResourceHealth, Status}; use crate::api::http::build_path; use crate::app::context::AppContext; use crate::error::RoadsterResult; #[cfg(feature = "open-api")] +use crate::health_check::{CheckResponse, ErrorData, Status}; +#[cfg(feature = "open-api")] use aide::axum::routing::get_with; #[cfg(feature = "open-api")] use aide::axum::ApiRouter; #[cfg(feature = "open-api")] use aide::transform::TransformOperation; -use axum::extract::FromRef; use axum::extract::State; +use axum::extract::{FromRef, Query}; use axum::routing::get; use axum::{Json, Router}; +#[cfg(feature = "open-api")] +use schemars::JsonSchema; +use serde_derive::{Deserialize, Serialize}; +use std::time::Duration; use tracing::instrument; #[cfg(feature = "open-api")] @@ -70,14 +74,30 @@ fn route(context: &AppContext) -> &str { .route } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct HeathCheckRequest { + /// Maximum time to spend checking the health of the resources in milliseconds + /// + /// Note: If this is greater than the timeout configured in middleware, the request may + /// time out before the `max_duration` elapses. + #[serde(skip_serializing_if = "Option::is_none")] + pub max_duration: Option, +} + #[instrument(skip_all)] -async fn health_get(State(state): State) -> RoadsterResult> +async fn health_get( + State(state): State, + Query(query): Query, +) -> RoadsterResult> where S: Clone + Send + Sync + 'static, AppContext: FromRef, { - let health = health_check::(&state).await?; - Ok(Json(health)) + let duration = Duration::from_millis(query.max_duration.unwrap_or(1000)); + Ok(Json(health_check(&state, Some(duration)).await?)) } #[cfg(feature = "open-api")] @@ -87,27 +107,26 @@ fn health_get_docs(op: TransformOperation) -> TransformOperation { .response_with::<200, Json, _>(|res| { res.example(HeathCheckResponse { latency: 20, - #[cfg(feature = "db-sql")] - db: ResourceHealth { - status: Status::Ok, - acquire_conn_latency: None, - ping_latency: None, - latency: 10, - }, - #[cfg(feature = "sidekiq")] - redis_enqueue: ResourceHealth { - status: Status::Ok, - acquire_conn_latency: Some(5), - ping_latency: Some(10), - latency: 15, - }, - #[cfg(feature = "sidekiq")] - redis_fetch: Some(ResourceHealth { - status: Status::Ok, - acquire_conn_latency: Some(15), - ping_latency: Some(20), - latency: 35, - }), + resources: std::collections::BTreeMap::from([ + ( + "db".to_string(), + CheckResponse::builder() + .status(Status::Ok) + .latency(Duration::from_secs(1)) + .build(), + ), + ( + "redis".to_string(), + CheckResponse::builder() + .status(Status::Err( + ErrorData::builder() + .msg("An error occurred".to_string()) + .build(), + )) + .latency(Duration::from_secs(2)) + .build(), + ), + ]), }) }) } diff --git a/src/app/context.rs b/src/app/context.rs index e330cdba..560d3f27 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -2,6 +2,7 @@ use crate::app::metadata::AppMetadata; use crate::app::App; use crate::config::app_config::AppConfig; use crate::error::RoadsterResult; +use crate::health_check::registry::HealthCheckRegistry; use axum::extract::FromRef; #[cfg(feature = "db-sql")] use sea_orm::DatabaseConnection; @@ -74,6 +75,7 @@ impl AppContext { let inner = AppContextInner { config, metadata, + health_checks: HealthCheckRegistry::new(), #[cfg(feature = "db-sql")] db, #[cfg(feature = "sidekiq")] @@ -125,6 +127,10 @@ impl AppContext { self.inner.metadata() } + pub fn health_checks(&self) -> &HealthCheckRegistry { + self.inner.health_checks() + } + #[cfg(feature = "db-sql")] pub fn db(&self) -> &DatabaseConnection { self.inner.db() @@ -144,6 +150,7 @@ impl AppContext { struct AppContextInner { config: AppConfig, metadata: AppMetadata, + health_checks: HealthCheckRegistry, #[cfg(feature = "db-sql")] db: DatabaseConnection, #[cfg(feature = "sidekiq")] @@ -166,6 +173,10 @@ impl AppContextInner { &self.metadata } + fn health_checks(&self) -> &HealthCheckRegistry { + &self.health_checks + } + #[cfg(feature = "db-sql")] fn db(&self) -> &DatabaseConnection { &self.db diff --git a/src/app/mod.rs b/src/app/mod.rs index b8000021..935826d6 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -12,6 +12,8 @@ use crate::config::app_config::AppConfig; #[cfg(not(feature = "cli"))] use crate::config::environment::Environment; use crate::error::RoadsterResult; +use crate::health_check::default::default_health_checks; +use crate::health_check::registry::HealthCheckRegistry; use crate::service::registry::ServiceRegistry; use crate::tracing::init_tracing; use async_trait::async_trait; @@ -66,6 +68,11 @@ where let state = A::provide_state(context.clone()).await?; + default_health_checks(&context) + .into_iter() + .try_for_each(|check| context.health_checks().register_arc(check))?; + A::health_checks(context.health_checks(), &state).await?; + #[cfg(feature = "cli")] if crate::api::cli::handle_cli(&app, &roadster_cli, &app_cli, &state).await? { return Ok(()); @@ -91,7 +98,7 @@ where A::M::up(context.db(), None).await?; } - crate::service::runner::health_checks(&service_registry, &state).await?; + crate::service::runner::health_checks(&context).await?; crate::service::runner::before_run(&service_registry, &state).await?; @@ -134,7 +141,18 @@ where /// See the following for more details regarding [FromRef]: async fn provide_state(context: AppContext) -> RoadsterResult; - /// Provide the services to run in the app. + /// Provide the [crate::health_check::HealthCheck]s to use throughout the app. + /// + /// Note that a non-mutable reference to the [HealthCheckRegistry] is provided. This is because + /// [HealthCheckRegistry] implements the + /// [interior mutability](https://doc.rust-lang.org/reference/interior-mutability.html) pattern. + /// As such, ___it is not recommended to register additional health checks outside of + /// this method___ -- doing so may result in a panic. + async fn health_checks(_registry: &HealthCheckRegistry, _state: &S) -> RoadsterResult<()> { + Ok(()) + } + + /// Provide the [crate::service::AppService]s to run in the app. async fn services(_registry: &mut ServiceRegistry, _state: &S) -> RoadsterResult<()> { Ok(()) } diff --git a/src/health_check/database.rs b/src/health_check/database.rs index c5c6f6a2..45938786 100644 --- a/src/health_check/database.rs +++ b/src/health_check/database.rs @@ -1,39 +1,27 @@ -use crate::api::core::health::{db_health, Status}; +use crate::api::core::health::db_health; use crate::app::context::AppContext; use crate::error::RoadsterResult; -use crate::health_check::HealthCheck; -use anyhow::anyhow; +use crate::health_check::{CheckResponse, HealthCheck}; use async_trait::async_trait; -use axum::extract::FromRef; use tracing::instrument; -pub struct DatabaseHealthCheck; +pub struct DatabaseHealthCheck { + pub(crate) context: AppContext, +} #[async_trait] -impl HealthCheck for DatabaseHealthCheck -where - S: Clone + Send + Sync + 'static, - AppContext: FromRef, -{ +impl HealthCheck for DatabaseHealthCheck { fn name(&self) -> String { "db".to_string() } - fn enabled(&self, state: &S) -> bool { - let context = AppContext::from_ref(state); - enabled(&context) + fn enabled(&self) -> bool { + enabled(&self.context) } #[instrument(skip_all)] - async fn check(&self, state: &S) -> RoadsterResult<()> { - let context = AppContext::from_ref(state); - let health = db_health(&context, None).await; - - if let Status::Err(err) = health.status { - return Err(anyhow!("Database connection pool is not healthy: {:?}", err).into()); - } - - Ok(()) + async fn check(&self) -> RoadsterResult { + Ok(db_health(&self.context, None).await) } } diff --git a/src/health_check/default.rs b/src/health_check/default.rs index 8e0b6dba..eedccfec 100644 --- a/src/health_check/default.rs +++ b/src/health_check/default.rs @@ -2,27 +2,29 @@ use crate::app::context::AppContext; #[cfg(feature = "db-sql")] use crate::health_check::database::DatabaseHealthCheck; #[cfg(feature = "sidekiq")] -use crate::health_check::sidekiq::SidekiqHealthCheck; +use crate::health_check::sidekiq_enqueue::SidekiqEnqueueHealthCheck; +#[cfg(feature = "sidekiq")] +use crate::health_check::sidekiq_fetch::SidekiqFetchHealthCheck; use crate::health_check::HealthCheck; -use axum::extract::FromRef; -use std::collections::BTreeMap; +use std::sync::Arc; -pub fn default_health_checks(state: &S) -> BTreeMap>> -where - S: Clone + Send + Sync + 'static, - AppContext: FromRef, -{ - let health_check: Vec>> = vec![ +pub fn default_health_checks( + #[allow(unused_variables)] context: &AppContext, +) -> Vec> { + vec![ #[cfg(feature = "db-sql")] - Box::new(DatabaseHealthCheck), + Arc::new(DatabaseHealthCheck { + context: context.clone(), + }), + #[cfg(feature = "sidekiq")] + Arc::new(SidekiqEnqueueHealthCheck { + context: context.clone(), + }), #[cfg(feature = "sidekiq")] - Box::new(SidekiqHealthCheck), - ]; - health_check - .into_iter() - .filter(|health_check| health_check.enabled(state)) - .map(|health_check| (health_check.name(), health_check)) - .collect() + Arc::new(SidekiqFetchHealthCheck { + context: context.clone(), + }), + ] } #[cfg(all(test, feature = "sidekiq", feature = "db-sql",))] @@ -53,7 +55,7 @@ mod tests { // Act let health_checks = super::default_health_checks(&context); - let health_checks = health_checks.keys().collect_vec(); + let health_checks = health_checks.iter().map(|check| check.name()).collect_vec(); // Assert assert_toml_snapshot!(health_checks); diff --git a/src/health_check/mod.rs b/src/health_check/mod.rs index 98cf3510..1e085d08 100644 --- a/src/health_check/mod.rs +++ b/src/health_check/mod.rs @@ -1,14 +1,64 @@ #[cfg(feature = "db-sql")] pub mod database; pub mod default; +pub mod registry; #[cfg(feature = "sidekiq")] -pub mod sidekiq; - -use crate::app::context::AppContext; +pub mod sidekiq_enqueue; +#[cfg(feature = "sidekiq")] +pub mod sidekiq_fetch; use crate::error::RoadsterResult; use async_trait::async_trait; -use axum::extract::FromRef; +#[cfg(feature = "open-api")] +use schemars::JsonSchema; +use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; +use serde_with::{serde_as, skip_serializing_none}; +use typed_builder::TypedBuilder; + +#[serde_as] +#[skip_serializing_none] +#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] +#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct CheckResponse { + pub status: Status, + /// Total latency of checking the health of the resource in milliseconds. + #[builder(setter(transform = |duration: std::time::Duration| duration.as_millis() ))] + pub latency: u128, + /// Custom health data, for example, separate latency measurements for acquiring a connection + /// from a resource pool vs making a request with the connection. + #[serde(flatten)] + #[builder(default, setter(transform = |custom: impl serde::Serialize| serialize_custom(custom) ))] + pub custom: Option, +} + +fn serialize_custom(custom: impl serde::Serialize) -> Option { + Some( + serde_json::to_value(custom) + .unwrap_or_else(|err| Value::String(format!("Unable to serialize custom data: {err}"))), + ) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub enum Status { + Ok, + Err(ErrorData), +} + +#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] +#[cfg_attr(feature = "open-api", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct ErrorData { + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default, setter(strip_option))] + pub msg: Option, +} /// Trait used to check the health of the app before its services start up. /// @@ -19,25 +69,19 @@ use axum::extract::FromRef; /// needs the resources. /// /// Another benefit of using a separate trait is, because the health checks are decoupled from -/// services, they can potentially be used in other parts of the app. For example, they could -/// be used to implement a "health check" API endpoint. -// Todo: Use the `HealthCheck` trait to implement the "health check" api - https://github.com/roadster-rs/roadster/issues/241 -// Todo: does order of the async_trait/automock attributes matter? +/// services, they can potentially be used in other parts of the app. For example, they can +/// be used to implement the "health check" API endpoint. #[cfg_attr(test, mockall::automock)] #[async_trait] -pub trait HealthCheck: Send + Sync -where - S: Clone + Send + Sync + 'static, - AppContext: FromRef, -{ +pub trait HealthCheck: Send + Sync { /// The name of the health check. fn name(&self) -> String; /// Whether the health check is enabled. If the health check is not enabled, Roadster will not /// run it. However, if a consumer wants, they can certainly create a [HealthCheck] instance /// and directly call `HealthCheck#check` even if `HealthCheck#enabled` returns `false`. - fn enabled(&self, state: &S) -> bool; + fn enabled(&self) -> bool; /// Run the health check. - async fn check(&self, state: &S) -> RoadsterResult<()>; + async fn check(&self) -> RoadsterResult; } diff --git a/src/health_check/registry.rs b/src/health_check/registry.rs new file mode 100644 index 00000000..a7214374 --- /dev/null +++ b/src/health_check/registry.rs @@ -0,0 +1,112 @@ +use crate::error::RoadsterResult; +use crate::health_check::HealthCheck; +use anyhow::anyhow; +use std::collections::BTreeMap; +use std::sync::{Arc, RwLock}; +use tracing::info; + +/// Registry for [HealthCheck]s that will be run in the app. +/// +/// Health checks are used in multiple parts of the app, for example: +/// 1. As pre-boot checks to ensure the app's resource dependencies are healthy. +/// 2. As a "core" API that can be used from multiple components, e.g. the `_health` HTTP endpoint +/// and the health CLI command. +/// +/// # Internal mutability +/// In order to make this registry available to multiple parts of the app, this is included +/// as part of the [AppContext][crate::app::context::AppContext]. This is not strictly necessary +/// for the Axum handlers (the registry could be provided via an [Extension][axum::Extension]), +/// but it is (currently) required for other things, such as the CLI handlers. +/// +/// In order to include the registry as part of the context, but also allow checks to be added +/// to the registry after the context is created, the registry implements the +/// [interior mutability](https://doc.rust-lang.org/reference/interior-mutability.html) pattern +/// using a [RwLock]. As such, ___it is not recommended to register additional health checks +/// outside of the app initialization process___ -- doing so may result in a panic. +/// +/// Because of the internal mutability, methods that modify the internal state can accept `&self` +/// instead of `&mut self`. +pub struct HealthCheckRegistry { + health_checks: Arc>>>, +} + +impl Default for HealthCheckRegistry { + fn default() -> Self { + HealthCheckRegistry::new() + } +} + +impl HealthCheckRegistry { + pub fn new() -> Self { + Self { + health_checks: Arc::new(RwLock::new(Default::default())), + } + } + + pub fn register(&self, health_check: H) -> RoadsterResult<()> + where + H: HealthCheck + 'static, + { + self.register_arc(Arc::new(health_check)) + } + + pub(crate) fn register_arc(&self, health_check: Arc) -> RoadsterResult<()> { + let name = health_check.name(); + + if !health_check.enabled() { + info!(name=%name, "Health check is not enabled, skipping registration"); + return Ok(()); + } + + info!(name=%name, "Registering health check"); + + let mut health_checks = self.health_checks.write().map_err(|err| { + anyhow!("Unable to acquire write lock on health check registry: {err}") + })?; + if health_checks.insert(name.clone(), health_check).is_some() { + return Err(anyhow!("Health check `{}` was already registered", name).into()); + } + Ok(()) + } + + pub fn checks(&self) -> RoadsterResult>> { + let health_checks = self + .health_checks + .read() + .map_err(|err| anyhow!("Unable to acquire read lock on heath check registry: {err}"))?; + Ok(health_checks.values().cloned().collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::health_check::MockHealthCheck; + use rstest::rstest; + + #[rstest] + #[case(true, 1)] + #[case(false, 0)] + #[cfg_attr(coverage_nightly, coverage(off))] + fn register_check(#[case] service_enabled: bool, #[case] expected_count: usize) { + // Arrange + let mut check: MockHealthCheck = MockHealthCheck::default(); + check.expect_enabled().return_const(service_enabled); + check.expect_name().return_const("test".to_string()); + + // Act + let subject: HealthCheckRegistry = HealthCheckRegistry::new(); + subject.register(check).unwrap(); + + // Assert + assert_eq!(subject.checks().unwrap().len(), expected_count); + assert_eq!( + subject + .checks() + .unwrap() + .iter() + .any(|check| check.name() == "test"), + service_enabled + ); + } +} diff --git a/src/health_check/sidekiq.rs b/src/health_check/sidekiq.rs deleted file mode 100644 index 71cf7db3..00000000 --- a/src/health_check/sidekiq.rs +++ /dev/null @@ -1,86 +0,0 @@ -use crate::api::core::health::{all_sidekiq_redis_health, Status}; -use crate::app::context::AppContext; -use crate::error::RoadsterResult; -use crate::health_check::HealthCheck; -use anyhow::anyhow; -use async_trait::async_trait; -use axum::extract::FromRef; -use tracing::instrument; - -pub struct SidekiqHealthCheck; - -#[async_trait] -impl HealthCheck for SidekiqHealthCheck -where - S: Clone + Send + Sync + 'static, - AppContext: FromRef, -{ - fn name(&self) -> String { - "sidekiq".to_string() - } - - fn enabled(&self, state: &S) -> bool { - enabled(&AppContext::from_ref(state)) - } - - #[instrument(skip_all)] - async fn check(&self, state: &S) -> RoadsterResult<()> { - let (redis_enqueue, redis_fetch) = - all_sidekiq_redis_health(&AppContext::from_ref(state), None).await; - - if let Status::Err(err) = redis_enqueue.status { - return Err(anyhow!( - "Sidekiq redis enqueue connection pool is not healthy: {:?}", - err - ) - .into()); - } - if let Some(redis_fetch) = redis_fetch { - if let Status::Err(err) = redis_fetch.status { - return Err(anyhow!( - "Sidekiq redis fetch connection pool is not healthy: {:?}", - err - ) - .into()); - } - } - - Ok(()) - } -} - -fn enabled(context: &AppContext) -> bool { - context - .config() - .health_check - .sidekiq - .common - .enabled(context) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::app_config::AppConfig; - use rstest::rstest; - - #[rstest] - #[case(false, Some(true), true)] - #[case(false, Some(false), false)] - #[cfg_attr(coverage_nightly, coverage(off))] - fn enabled( - #[case] default_enable: bool, - #[case] enable: Option, - #[case] expected_enabled: bool, - ) { - // Arrange - let mut config = AppConfig::test(None).unwrap(); - config.health_check.default_enable = default_enable; - config.health_check.sidekiq.common.enable = enable; - - let context = AppContext::test(Some(config), None, None).unwrap(); - - // Act/Assert - assert_eq!(super::enabled(&context), expected_enabled); - } -} diff --git a/src/health_check/sidekiq_enqueue.rs b/src/health_check/sidekiq_enqueue.rs new file mode 100644 index 00000000..f0883e3c --- /dev/null +++ b/src/health_check/sidekiq_enqueue.rs @@ -0,0 +1,62 @@ +use crate::api::core::health::redis_health; +use crate::app::context::AppContext; +use crate::error::RoadsterResult; +use crate::health_check::{CheckResponse, HealthCheck}; +use async_trait::async_trait; +use tracing::instrument; + +pub struct SidekiqEnqueueHealthCheck { + pub(crate) context: AppContext, +} + +#[async_trait] +impl HealthCheck for SidekiqEnqueueHealthCheck { + fn name(&self) -> String { + "sidekiq-enqueue".to_string() + } + + fn enabled(&self) -> bool { + enabled(&self.context) + } + + #[instrument(skip_all)] + async fn check(&self) -> RoadsterResult { + Ok(redis_health(self.context.redis_enqueue(), None).await) + } +} + +fn enabled(context: &AppContext) -> bool { + context + .config() + .health_check + .sidekiq + .common + .enabled(context) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::app_config::AppConfig; + use rstest::rstest; + + #[rstest] + #[case(false, Some(true), true)] + #[case(false, Some(false), false)] + #[cfg_attr(coverage_nightly, coverage(off))] + fn enabled( + #[case] default_enable: bool, + #[case] enable: Option, + #[case] expected_enabled: bool, + ) { + // Arrange + let mut config = AppConfig::test(None).unwrap(); + config.health_check.default_enable = default_enable; + config.health_check.sidekiq.common.enable = enable; + + let context = AppContext::test(Some(config), None, None).unwrap(); + + // Act/Assert + assert_eq!(super::enabled(&context), expected_enabled); + } +} diff --git a/src/health_check/sidekiq_fetch.rs b/src/health_check/sidekiq_fetch.rs new file mode 100644 index 00000000..6984b9ee --- /dev/null +++ b/src/health_check/sidekiq_fetch.rs @@ -0,0 +1,83 @@ +use crate::api::core::health::redis_health; +use crate::app::context::AppContext; +use crate::error::RoadsterResult; +use crate::health_check::{CheckResponse, HealthCheck}; +use anyhow::anyhow; +use async_trait::async_trait; +use tracing::instrument; + +pub struct SidekiqFetchHealthCheck { + pub(crate) context: AppContext, +} + +#[async_trait] +impl HealthCheck for SidekiqFetchHealthCheck { + fn name(&self) -> String { + "sidekiq-fetch".to_string() + } + + fn enabled(&self) -> bool { + enabled(&self.context) + } + + #[instrument(skip_all)] + async fn check(&self) -> RoadsterResult { + Ok(redis_health( + self.context + .redis_fetch() + .as_ref() + .ok_or_else(|| anyhow!("Redis fetch connection pool is not present"))?, + None, + ) + .await) + } +} + +fn enabled(context: &AppContext) -> bool { + context.redis_fetch().is_some() + && context + .config() + .health_check + .sidekiq + .common + .enabled(context) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::app_config::AppConfig; + use bb8::Pool; + use rstest::rstest; + use sidekiq::RedisConnectionManager; + + #[rstest] + #[case(false, Some(true), true, true)] + #[case(false, Some(true), false, false)] + #[case(false, Some(false), false, false)] + #[cfg_attr(coverage_nightly, coverage(off))] + #[tokio::test] + async fn enabled( + #[case] default_enable: bool, + #[case] enable: Option, + #[case] pool: bool, + #[case] expected_enabled: bool, + ) { + // Arrange + let mut config = AppConfig::test(None).unwrap(); + config.health_check.default_enable = default_enable; + config.health_check.sidekiq.common.enable = enable; + + let redis_fetch_pool = if pool { + let redis_fetch = RedisConnectionManager::new("redis://invalid_host:1234").unwrap(); + let pool = Pool::builder().build_unchecked(redis_fetch); + Some(pool) + } else { + None + }; + let context = AppContext::test(Some(config), None, redis_fetch_pool).unwrap(); + + // Act/Assert + assert_eq!(super::enabled(&context), expected_enabled); + } +} diff --git a/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_1.snap b/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_1.snap index c3b7bad8..1bd9b8b9 100644 --- a/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_1.snap +++ b/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_1.snap @@ -2,4 +2,8 @@ source: src/health_check/default.rs expression: health_checks --- -[] +[ + 'db', + 'sidekiq-enqueue', + 'sidekiq-fetch', +] diff --git a/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_2.snap b/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_2.snap index 415b4e7f..1bd9b8b9 100644 --- a/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_2.snap +++ b/src/health_check/snapshots/roadster__health_check__default__tests__default_middleware@case_2.snap @@ -4,5 +4,6 @@ expression: health_checks --- [ 'db', - 'sidekiq', + 'sidekiq-enqueue', + 'sidekiq-fetch', ] diff --git a/src/service/registry.rs b/src/service/registry.rs index 5b369c0e..2bc3b724 100644 --- a/src/service/registry.rs +++ b/src/service/registry.rs @@ -1,8 +1,6 @@ use crate::app::context::AppContext; use crate::app::App; use crate::error::RoadsterResult; -use crate::health_check::default::default_health_checks; -use crate::health_check::HealthCheck; use crate::service::{AppService, AppServiceBuilder}; use anyhow::anyhow; use axum::extract::FromRef; @@ -17,8 +15,6 @@ where A: App + ?Sized + 'static, { pub(crate) state: S, - /// Health checks that need to succeed before any of the services can run. - pub(crate) health_checks: BTreeMap>>, pub(crate) services: BTreeMap>>, } @@ -31,36 +27,10 @@ where pub(crate) fn new(state: &S) -> Self { Self { state: state.clone(), - health_checks: default_health_checks(state), services: Default::default(), } } - /// Register a health check that needs to succeed before any service can run. - // Todo: Would it make more sense to add a separate method to the `App` trait? - pub fn register_health_check(&mut self, health_check: H) -> RoadsterResult<()> - where - H: HealthCheck + 'static, - { - let name = health_check.name(); - - if !health_check.enabled(&self.state) { - info!(name=%name, "Health check is not enabled, skipping registration"); - return Ok(()); - } - - info!(name=%name, "Registering health check"); - - if self - .health_checks - .insert(name.clone(), Box::new(health_check)) - .is_some() - { - return Err(anyhow!("Health check `{}` was already registered", name).into()); - } - Ok(()) - } - /// Register a new service. If the service is not enabled (e.g., [AppService::enabled] is `false`), /// the service will not be registered. pub fn register_service(&mut self, service: Service) -> RoadsterResult<()> diff --git a/src/service/runner.rs b/src/service/runner.rs index b0de27a9..03f4b7aa 100644 --- a/src/service/runner.rs +++ b/src/service/runner.rs @@ -1,11 +1,15 @@ #[cfg(feature = "cli")] use crate::api::cli::roadster::RoadsterCli; +use crate::api::core::health::health_check; use crate::app::context::AppContext; use crate::app::App; use crate::error::RoadsterResult; +use crate::health_check::Status; use crate::service::registry::ServiceRegistry; +use anyhow::anyhow; use axum::extract::FromRef; use std::future::Future; +use std::time::Duration; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument}; @@ -30,21 +34,26 @@ where Ok(false) } -pub(crate) async fn health_checks( - service_registry: &ServiceRegistry, - state: &S, -) -> RoadsterResult<()> -where - S: Clone + Send + Sync + 'static, - AppContext: FromRef, - A: App, -{ - for (name, health_check) in service_registry.health_checks.iter() { - info!(name=%name, "Running health check"); - health_check.check(state).await?; +pub(crate) async fn health_checks(context: &AppContext) -> RoadsterResult<()> { + let duration = Duration::from_secs(60); + info!( + "Running checks for a maximum duration of {} seconds", + duration.as_secs() + ); + let response = health_check(context, Some(duration)).await?; + + let error_response = response + .resources + .iter() + .find(|(_name, response)| !matches!(response.status, Status::Ok)); + + if let Some((name, response)) = error_response { + let msg = format!("Resource is not healthy: {response:?}"); + error!(name=%name, msg); + Err(anyhow!("{msg}"))? + } else { + Ok(()) } - - Ok(()) } pub(crate) async fn before_run(