Skip to content

Commit

Permalink
health_check config
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Feb 28, 2024
1 parent 6f62eaa commit c8865d8
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 11 deletions.
1 change: 1 addition & 0 deletions benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ fn config() -> Config {
format!("ws://{}", SERVER_TWO_ENDPOINT),
],
shuffle_endpoints: false,
health_check: None,
}),
server: Some(ServerConfig {
listen_address: SUBWAY_SERVER_ADDR.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ extensions:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
health_check:
interval_sec: 10 # check interval, default is 10s
response_threshold_ms: 250 # max response time to be considered healthy, default is 250ms
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand Down
23 changes: 16 additions & 7 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{extensions::client::get_backoff_time, utils::errors};
use crate::{
extensions::client::{get_backoff_time, HealthCheckConfig},
utils::errors,
};
use jsonrpsee::{
async_client::Client,
core::client::{ClientT, Subscription, SubscriptionClientT},
Expand Down Expand Up @@ -78,7 +81,12 @@ impl Drop for Endpoint {
}

impl Endpoint {
pub fn new(url: String, request_timeout: Option<Duration>, connection_timeout: Option<Duration>) -> Self {
pub fn new(
url: String,
request_timeout: Option<Duration>,
connection_timeout: Option<Duration>,
health_config: HealthCheckConfig,
) -> Self {
let (client_tx, client_rx) = tokio::sync::watch::channel(None);
let on_client_ready = Arc::new(tokio::sync::Notify::new());
let health = Arc::new(Health::default());
Expand Down Expand Up @@ -133,14 +141,16 @@ impl Endpoint {
// Wait for the client to be ready before starting the health check
on_client_ready_.notified().await;

let method_name = "system_health";
let interval = Duration::from_secs(10);
let method_name = health_config.health_method.as_str();
let interval = Duration::from_secs(health_config.interval_sec);
let response_threshold = Duration::from_micros(health_config.response_threshold_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => return,
};

// Check if the endpoint has the 'system_health' method
// Check if the endpoint has the health method
match client
.request::<serde_json::Value, Vec<serde_json::Value>>("rpc_methods", vec![])
.await
Expand Down Expand Up @@ -179,8 +189,7 @@ impl Endpoint {
health_.update(Event::StaleChain);
continue;
}
// TODO: make this configurable
if request_start.elapsed().as_millis() > 250 {
if request_start.elapsed() > response_threshold {
health_.update(Event::SlowResponse);
continue;
}
Expand Down
41 changes: 37 additions & 4 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,35 @@ pub struct ClientConfig {
pub endpoints: Vec<String>,
#[serde(default = "bool_true")]
pub shuffle_endpoints: bool,
pub health_check: Option<HealthCheckConfig>,
}

pub fn bool_true() -> bool {
true
}

#[derive(Deserialize, Debug, Default, Clone)]
pub struct HealthCheckConfig {
#[serde(default = "interval_sec")]
pub interval_sec: u64,
#[serde(default = "response_threshold_ms")]
pub response_threshold_ms: u64,
#[serde(default = "system_health")]
pub health_method: String,
}

pub fn interval_sec() -> u64 {
10
}

pub fn response_threshold_ms() -> u64 {
250
}

pub fn system_health() -> String {
"system_health".to_string()
}

#[derive(Debug)]
enum Message {
Request {
Expand All @@ -75,12 +98,13 @@ impl Extension for Client {
type Config = ClientConfig;

async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result<Self, anyhow::Error> {
let health_check = config.health_check.clone();
if config.shuffle_endpoints {
let mut endpoints = config.endpoints.clone();
endpoints.shuffle(&mut thread_rng());
Ok(Self::new(endpoints, None, None, None)?)
Ok(Self::new(endpoints, None, None, None, health_check)?)
} else {
Ok(Self::new(config.endpoints.clone(), None, None, None)?)
Ok(Self::new(config.endpoints.clone(), None, None, None, health_check)?)
}
}
}
Expand All @@ -91,7 +115,9 @@ impl Client {
request_timeout: Option<Duration>,
connection_timeout: Option<Duration>,
retries: Option<u32>,
health_config: Option<HealthCheckConfig>,
) -> Result<Self, anyhow::Error> {
let health_config = health_config.unwrap_or_default();
let endpoints: Vec<_> = endpoints.into_iter().map(|e| e.as_ref().to_string()).collect();

if endpoints.is_empty() {
Expand All @@ -106,7 +132,14 @@ impl Client {

let endpoints = endpoints
.into_iter()
.map(|e| Arc::new(Endpoint::new(e, request_timeout, connection_timeout)))
.map(|e| {
Arc::new(Endpoint::new(
e,
request_timeout,
connection_timeout,
health_config.clone(),
))
})
.collect::<Vec<_>>();

let (message_tx, mut message_rx) = tokio::sync::mpsc::channel::<Message>(100);
Expand Down Expand Up @@ -344,7 +377,7 @@ impl Client {
}

pub fn with_endpoints(endpoints: impl IntoIterator<Item = impl AsRef<str>>) -> Result<Self, anyhow::Error> {
Self::new(endpoints, None, None, None)
Self::new(endpoints, None, None, None, None)
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> CallResult {
Expand Down
2 changes: 2 additions & 0 deletions src/extensions/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async fn retry_requests_successful() {
Some(Duration::from_millis(100)),
None,
Some(2),
None,
)
.unwrap();

Expand Down Expand Up @@ -189,6 +190,7 @@ async fn retry_requests_out_of_retries() {
Some(Duration::from_millis(100)),
None,
Some(2),
None,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ mod tests {
client: Some(ClientConfig {
endpoints: vec![endpoint],
shuffle_endpoints: false,
health_check: None,
}),
server: Some(ServerConfig {
listen_address: "127.0.0.1".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/tests/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async fn merge_subscription_works() {
client: Some(ClientConfig {
endpoints: vec![format!("ws://{addr}")],
shuffle_endpoints: false,
health_check: None,
}),
server: Some(ServerConfig {
listen_address: "0.0.0.0".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/tests/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn upstream_error_propagate() {
client: Some(ClientConfig {
endpoints: vec![format!("ws://{addr}")],
shuffle_endpoints: false,
health_check: None,
}),
server: Some(ServerConfig {
listen_address: "0.0.0.0".to_string(),
Expand Down

0 comments on commit c8865d8

Please sign in to comment.