From c8865d80455d01e81d8243c99c42a8eec26fec29 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Wed, 28 Feb 2024 21:25:19 +0100 Subject: [PATCH] health_check config --- benches/bench/main.rs | 1 + config.yml | 3 +++ src/extensions/client/endpoint.rs | 23 +++++++++++------ src/extensions/client/mod.rs | 41 ++++++++++++++++++++++++++++--- src/extensions/client/tests.rs | 2 ++ src/server.rs | 1 + src/tests/merge_subscription.rs | 1 + src/tests/upstream.rs | 1 + 8 files changed, 62 insertions(+), 11 deletions(-) diff --git a/benches/bench/main.rs b/benches/bench/main.rs index 887246d..cffebe1 100644 --- a/benches/bench/main.rs +++ b/benches/bench/main.rs @@ -217,6 +217,7 @@ fn config() -> Config { format!("ws://{}", SERVER_TWO_ENDPOINT), ], shuffle_endpoints: false, + health_check: None, }), server: Some(ServerConfig { listen_address: SUBWAY_SERVER_ADDR.to_string(), diff --git a/config.yml b/config.yml index c284bf2..fe7749e 100644 --- a/config.yml +++ b/config.yml @@ -3,6 +3,9 @@ extensions: endpoints: - wss://acala-rpc.dwellir.com - wss://acala-rpc-0.aca-api.network + health_check: + interval_sec: 10 # check interval, default is 10s + response_threshold_ms: 250 # max response time to be considered healthy, default is 250ms event_bus: substrate_api: stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index cc3b1a6..2cc080b 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -1,4 +1,7 @@ -use crate::{extensions::client::get_backoff_time, utils::errors}; +use crate::{ + extensions::client::{get_backoff_time, HealthCheckConfig}, + utils::errors, +}; use jsonrpsee::{ async_client::Client, core::client::{ClientT, Subscription, SubscriptionClientT}, @@ -78,7 +81,12 @@ impl Drop for Endpoint { } impl Endpoint { - pub fn new(url: String, request_timeout: Option, connection_timeout: Option) -> Self { + pub fn new( + url: String, + request_timeout: Option, + connection_timeout: Option, + health_config: HealthCheckConfig, + ) -> Self { let (client_tx, client_rx) = tokio::sync::watch::channel(None); let on_client_ready = Arc::new(tokio::sync::Notify::new()); let health = Arc::new(Health::default()); @@ -133,14 +141,16 @@ impl Endpoint { // Wait for the client to be ready before starting the health check on_client_ready_.notified().await; - let method_name = "system_health"; - let interval = Duration::from_secs(10); + let method_name = health_config.health_method.as_str(); + let interval = Duration::from_secs(health_config.interval_sec); + let response_threshold = Duration::from_micros(health_config.response_threshold_ms); + let client = match client_rx_.borrow().clone() { Some(client) => client, None => return, }; - // Check if the endpoint has the 'system_health' method + // Check if the endpoint has the health method match client .request::>("rpc_methods", vec![]) .await @@ -179,8 +189,7 @@ impl Endpoint { health_.update(Event::StaleChain); continue; } - // TODO: make this configurable - if request_start.elapsed().as_millis() > 250 { + if request_start.elapsed() > response_threshold { health_.update(Event::SlowResponse); continue; } diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index ec5ae85..a18cff9 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -46,12 +46,35 @@ pub struct ClientConfig { pub endpoints: Vec, #[serde(default = "bool_true")] pub shuffle_endpoints: bool, + pub health_check: Option, } pub fn bool_true() -> bool { true } +#[derive(Deserialize, Debug, Default, Clone)] +pub struct HealthCheckConfig { + #[serde(default = "interval_sec")] + pub interval_sec: u64, + #[serde(default = "response_threshold_ms")] + pub response_threshold_ms: u64, + #[serde(default = "system_health")] + pub health_method: String, +} + +pub fn interval_sec() -> u64 { + 10 +} + +pub fn response_threshold_ms() -> u64 { + 250 +} + +pub fn system_health() -> String { + "system_health".to_string() +} + #[derive(Debug)] enum Message { Request { @@ -75,12 +98,13 @@ impl Extension for Client { type Config = ClientConfig; async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result { + let health_check = config.health_check.clone(); if config.shuffle_endpoints { let mut endpoints = config.endpoints.clone(); endpoints.shuffle(&mut thread_rng()); - Ok(Self::new(endpoints, None, None, None)?) + Ok(Self::new(endpoints, None, None, None, health_check)?) } else { - Ok(Self::new(config.endpoints.clone(), None, None, None)?) + Ok(Self::new(config.endpoints.clone(), None, None, None, health_check)?) } } } @@ -91,7 +115,9 @@ impl Client { request_timeout: Option, connection_timeout: Option, retries: Option, + health_config: Option, ) -> Result { + let health_config = health_config.unwrap_or_default(); let endpoints: Vec<_> = endpoints.into_iter().map(|e| e.as_ref().to_string()).collect(); if endpoints.is_empty() { @@ -106,7 +132,14 @@ impl Client { let endpoints = endpoints .into_iter() - .map(|e| Arc::new(Endpoint::new(e, request_timeout, connection_timeout))) + .map(|e| { + Arc::new(Endpoint::new( + e, + request_timeout, + connection_timeout, + health_config.clone(), + )) + }) .collect::>(); let (message_tx, mut message_rx) = tokio::sync::mpsc::channel::(100); @@ -344,7 +377,7 @@ impl Client { } pub fn with_endpoints(endpoints: impl IntoIterator>) -> Result { - Self::new(endpoints, None, None, None) + Self::new(endpoints, None, None, None, None) } pub async fn request(&self, method: &str, params: Vec) -> CallResult { diff --git a/src/extensions/client/tests.rs b/src/extensions/client/tests.rs index c8c9c7b..c48649c 100644 --- a/src/extensions/client/tests.rs +++ b/src/extensions/client/tests.rs @@ -152,6 +152,7 @@ async fn retry_requests_successful() { Some(Duration::from_millis(100)), None, Some(2), + None, ) .unwrap(); @@ -189,6 +190,7 @@ async fn retry_requests_out_of_retries() { Some(Duration::from_millis(100)), None, Some(2), + None, ) .unwrap(); diff --git a/src/server.rs b/src/server.rs index 4745d70..5e1c5fc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -235,6 +235,7 @@ mod tests { client: Some(ClientConfig { endpoints: vec![endpoint], shuffle_endpoints: false, + health_check: None, }), server: Some(ServerConfig { listen_address: "127.0.0.1".to_string(), diff --git a/src/tests/merge_subscription.rs b/src/tests/merge_subscription.rs index a41bb4b..37a7c53 100644 --- a/src/tests/merge_subscription.rs +++ b/src/tests/merge_subscription.rs @@ -49,6 +49,7 @@ async fn merge_subscription_works() { client: Some(ClientConfig { endpoints: vec![format!("ws://{addr}")], shuffle_endpoints: false, + health_check: None, }), server: Some(ServerConfig { listen_address: "0.0.0.0".to_string(), diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs index 84c38f5..9c730da 100644 --- a/src/tests/upstream.rs +++ b/src/tests/upstream.rs @@ -31,6 +31,7 @@ async fn upstream_error_propagate() { client: Some(ClientConfig { endpoints: vec![format!("ws://{addr}")], shuffle_endpoints: false, + health_check: None, }), server: Some(ServerConfig { listen_address: "0.0.0.0".to_string(),