Skip to content

Commit

Permalink
[GraphQL] Add health endpoint (#18277)
Browse files Browse the repository at this point in the history
## Description 

Adds a `checkpoint_timestamp_ms` to the watermark task and uses it in a
new health check endpoint function. The health endpoint checks for two
things
- if there is a DB connection, otherwise it returns code 500
- if the last known checkpoint timestamp is within an acceptable buffer.
It subtracts the current timestamp from the checkpoint timestamp, and
checks if the value is larger than the provided query param
`max_checkpoint_lag_ms` or a default value, and it returns code 504,
GATEWAY TIMEOUT in that case.

How to query this endpoint:
`curl -X GET "http://127.0.0.1:8000/health" -i `
Set the check for max checkpoint time lag to 10s. If it returns 503,
then the checkpoint is behind.
`curl -X GET "http://127.0.0.1:8000/health?max_checkpoint_lag_ms=10000"
-i`

## Test plan 

Added a new test.

`cargo nextest run --features pg_integration -- test_health_check`

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
stefan-mysten authored Jun 25, 2024
1 parent 98a26b8 commit f645673
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
79 changes: 73 additions & 6 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ use async_graphql::EmptySubscription;
use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::extract::FromRef;
use axum::extract::{connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, State};
use axum::extract::{
connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, Query as AxumQuery, State,
};
use axum::http::{HeaderMap, StatusCode};
use axum::middleware::{self};
use axum::response::IntoResponse;
use axum::routing::{post, MethodRouter, Route};
use axum::routing::{get, post, MethodRouter, Route};
use axum::Extension;
use axum::{headers::Header, Router};
use chrono::Utc;
use http::{HeaderValue, Method, Request};
use hyper::server::conn::AddrIncoming as HyperAddrIncoming;
use hyper::Body;
Expand All @@ -51,6 +55,7 @@ use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandl
use std::convert::Infallible;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;
use std::{any::Any, net::SocketAddr, time::Instant};
use sui_graphql_rpc_headers::{LIMITS_HEADER, VERSION_HEADER};
use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
Expand All @@ -63,6 +68,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, warn};
use uuid::Uuid;

/// The default allowed maximum lag between the current timestamp and the checkpoint timestamp.
const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);

pub(crate) struct Server {
pub server: HyperServer<HyperAddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
watermark_task: WatermarkTask,
Expand Down Expand Up @@ -248,7 +256,7 @@ impl ServerBuilder {
.route("/:version", post(graphql_handler))
.route("/graphql", post(graphql_handler))
.route("/graphql/:version", post(graphql_handler))
.route("/health", axum::routing::get(health_checks))
.route("/health", get(health_check))
.with_state(self.state.clone())
.route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
metrics: self.state.metrics.clone(),
Expand Down Expand Up @@ -502,8 +510,8 @@ pub fn export_schema() -> String {
/// if set in the request headers, and the watermark as set by the background task.
async fn graphql_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
schema: axum::Extension<SuiGraphQLSchema>,
axum::Extension(watermark_lock): axum::Extension<WatermarkLock>,
schema: Extension<SuiGraphQLSchema>,
Extension(watermark_lock): Extension<WatermarkLock>,
headers: HeaderMap,
req: GraphQLRequest,
) -> (axum::http::Extensions, GraphQLResponse) {
Expand Down Expand Up @@ -579,7 +587,7 @@ impl Drop for MetricsCallbackHandler {
struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);

/// Connect via a TCPStream to the DB to check if it is alive
async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode {
async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
return StatusCode::INTERNAL_SERVER_ERROR;
};
Expand All @@ -601,6 +609,48 @@ async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode
}
}

#[derive(serde::Deserialize)]
struct HealthParam {
max_checkpoint_lag_ms: Option<u64>,
}

/// Endpoint for querying the health of the service.
/// It returns 500 for any internal error, including not connecting to the DB,
/// and 504 if the checkpoint timestamp is too far behind the current timestamp as per the
/// max checkpoint timestamp lag query parameter, or the default value if not provided.
async fn health_check(
State(connection): State<ConnectionConfig>,
Extension(watermark_lock): Extension<WatermarkLock>,
AxumQuery(query_params): AxumQuery<HealthParam>,
) -> StatusCode {
let db_health_check = db_health_check(axum::extract::State(connection)).await;
if db_health_check != StatusCode::OK {
return db_health_check;
}

let max_checkpoint_lag_ms = query_params
.max_checkpoint_lag_ms
.map(Duration::from_millis)
.unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);

let checkpoint_timestamp =
Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);

let now_millis = Utc::now().timestamp_millis();

// Check for negative timestamp or conversion failure
let now: Duration = match u64::try_from(now_millis) {
Ok(val) => Duration::from_millis(val),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
};

if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
return StatusCode::GATEWAY_TIMEOUT;
}

db_health_check
}

// One server per proc, so this is okay
async fn get_or_init_server_start_time() -> &'static Instant {
static ONCE: OnceCell<Instant> = OnceCell::const_new();
Expand Down Expand Up @@ -651,6 +701,7 @@ pub mod tests {
let cancellation_token = CancellationToken::new();
let watermark = Watermark {
checkpoint: 1,
checkpoint_timestamp_ms: 1,
epoch: 0,
};
let state = AppState::new(
Expand Down Expand Up @@ -1019,4 +1070,20 @@ pub mod tests {
assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
}

pub async fn test_health_check_impl() {
let server_builder = prep_schema(None, None);
let url = format!(
"http://{}:{}/health",
server_builder.state.connection.host, server_builder.state.connection.port
);
server_builder.build_schema();

let resp = reqwest::get(&url).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::GATEWAY_TIMEOUT);
}
}
15 changes: 10 additions & 5 deletions crates/sui-graphql-rpc/src/server/watermark_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::sync::{watch, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

/// Watermark task that periodically updates the current checkpoint and epoch values.
/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and
/// epoch values.
pub(crate) struct WatermarkTask {
/// Thread-safe watermark that avoids writer starvation
watermark: WatermarkLock,
Expand All @@ -34,6 +35,8 @@ pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
pub(crate) struct Watermark {
/// The checkpoint upper-bound for the query.
pub checkpoint: u64,
/// The checkpoint upper-bound timestamp for the query.
pub checkpoint_timestamp_ms: u64,
/// The current epoch.
pub epoch: u64,
}
Expand Down Expand Up @@ -67,7 +70,7 @@ impl WatermarkTask {
return;
},
_ = tokio::time::sleep(self.sleep) => {
let Watermark { checkpoint, epoch } = match Watermark::query(&self.db).await {
let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
Ok(Some(watermark)) => watermark,
Ok(None) => continue,
Err(e) => {
Expand All @@ -81,6 +84,7 @@ impl WatermarkTask {
let prev_epoch = {
let mut w = self.watermark.write().await;
w.checkpoint = checkpoint;
w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
mem::replace(&mut w.epoch, epoch)
};

Expand All @@ -107,17 +111,18 @@ impl Watermark {
let w = lock.read().await;
Self {
checkpoint: w.checkpoint,
checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
epoch: w.epoch,
}
}

pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
use checkpoints::dsl;
let Some((checkpoint, epoch)): Option<(i64, i64)> = db
let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
.execute(move |conn| {
conn.first(move || {
dsl::checkpoints
.select((dsl::sequence_number, dsl::epoch))
.select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
.order_by(dsl::sequence_number.desc())
})
.optional()
Expand All @@ -127,9 +132,9 @@ impl Watermark {
else {
return Ok(None);
};

Ok(Some(Watermark {
checkpoint: checkpoint as u64,
checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
epoch: epoch as u64,
}))
}
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-graphql-rpc/tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,4 +880,21 @@ mod tests {
async fn test_query_complexity_metrics() {
test_query_complexity_metrics_impl().await;
}

#[tokio::test]
#[serial]
async fn test_health_check() {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
let connection_config = ConnectionConfig::ci_integration_test_cfg();
let cluster =
sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await;

println!("Cluster started");
cluster
.wait_for_checkpoint_catchup(0, Duration::from_secs(10))
.await;
test_health_check_impl().await;
}
}

0 comments on commit f645673

Please sign in to comment.