Skip to content

Commit

Permalink
health.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Mar 6, 2024
1 parent c8865d8 commit 0b3a1ee
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 114 deletions.
119 changes: 5 additions & 114 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::health::{Event, Health};
use crate::{
extensions::client::{get_backoff_time, HealthCheckConfig},
utils::errors,
Expand All @@ -15,57 +16,6 @@ use std::{
time::Duration,
};

#[derive(Debug)]
pub enum Event {
ResponseOk,
SlowResponse,
RequestTimeout,
ConnectionSuccessful,
ConnectionFailed,
StaleChain,
}

#[derive(Debug, Default)]
pub struct Health {
pub score: AtomicU32,
}

impl Health {
pub fn score(&self) -> u32 {
self.score.load(Ordering::Relaxed)
}

pub fn update(&self, event: Event) {
const MAX_SCORE: u32 = 100;
let current_score = self.score.load(Ordering::Relaxed);
let new_score = u32::min(
match event {
Event::ResponseOk => current_score.saturating_add(1),
Event::SlowResponse => current_score.saturating_sub(5),
Event::RequestTimeout => current_score.saturating_sub(10),
Event::ConnectionFailed | Event::StaleChain => 0,
Event::ConnectionSuccessful => 100,
},
MAX_SCORE,
);
self.score.store(new_score, Ordering::Relaxed);
}

pub fn on_error(&self, err: &jsonrpsee::core::Error) {
match err {
jsonrpsee::core::Error::RequestTimeout => {
self.update(Event::RequestTimeout);
}
jsonrpsee::core::Error::Transport(_)
| jsonrpsee::core::Error::RestartNeeded(_)
| jsonrpsee::core::Error::MaxSlotsExceeded => {
self.update(Event::ConnectionFailed);
}
_ => {}
};
}
}

pub struct Endpoint {
pub url: String,
pub health: Arc<Health>,
Expand All @@ -89,7 +39,8 @@ impl Endpoint {
) -> Self {
let (client_tx, client_rx) = tokio::sync::watch::channel(None);
let on_client_ready = Arc::new(tokio::sync::Notify::new());
let health = Arc::new(Health::default());
let url_ = url.clone();
let health = Arc::new(Health::new(url_, health_config));

let url_ = url.clone();
let health_ = health.clone();
Expand Down Expand Up @@ -123,7 +74,7 @@ impl Endpoint {
Err(err) => {
health_.on_error(&err);
_ = client_tx.send(None);
tracing::warn!("Unable to connect to endpoint: '{url_}' error: {err}");
tracing::warn!("Unable to connect to endpoint: {url_} error: {err}");
tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await;
}
}
Expand All @@ -133,73 +84,13 @@ impl Endpoint {
});

// This task will check the health of the endpoint and update the health score
let url_ = url.clone();
let health_ = health.clone();
let on_client_ready_ = on_client_ready.clone();
let client_rx_ = client_rx.clone();
let health_checker = tokio::spawn(async move {
// Wait for the client to be ready before starting the health check
on_client_ready_.notified().await;

let method_name = health_config.health_method.as_str();
let interval = Duration::from_secs(health_config.interval_sec);
let response_threshold = Duration::from_micros(health_config.response_threshold_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => return,
};

// Check if the endpoint has the health method
match client
.request::<serde_json::Value, Vec<serde_json::Value>>("rpc_methods", vec![])
.await
{
Ok(response) => {
let has_health_method = response
.get("methods")
.unwrap_or(&serde_json::json!([]))
.as_array()
.map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name)))
.unwrap_or_default();
if !has_health_method {
tracing::warn!("Endpoint '{url_}' does not have the '{method_name}' method");
return;
}
}
Err(_) => return,
};

loop {
tokio::time::sleep(interval).await;

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => continue,
};

let request_start = std::time::Instant::now();
match client
.request::<serde_json::Value, Vec<serde_json::Value>>(method_name, vec![])
.await
{
Ok(response) => {
let is_syncing = response.get("isSyncing").unwrap_or(&serde_json::json!(false));
if is_syncing.as_bool() == Some(true) {
health_.update(Event::StaleChain);
continue;
}
if request_start.elapsed() > response_threshold {
health_.update(Event::SlowResponse);
continue;
}
health_.update(Event::ResponseOk);
}
Err(err) => {
health_.on_error(&err);
}
}
}
health_.monitor(client_rx_).await;
});

Self {
Expand Down
149 changes: 149 additions & 0 deletions src/extensions/client/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use crate::extensions::client::HealthCheckConfig;
use jsonrpsee::{async_client::Client, core::client::ClientT};
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

#[derive(Debug)]
pub enum Event {
ResponseOk,
SlowResponse,
RequestTimeout,
ConnectionSuccessful,
ConnectionFailed,
StaleChain,
}

#[derive(Debug, Default)]
pub struct Health {
pub url: String,
pub config: HealthCheckConfig,
pub score: AtomicU32,
}

impl Health {
pub fn new(url: String, config: HealthCheckConfig) -> Self {
Self {
url,
config,
score: AtomicU32::new(100),
}
}

pub fn score(&self) -> u32 {
self.score.load(Ordering::Relaxed)
}

pub fn update(&self, event: Event) {
const MAX_SCORE: u32 = 100;
let current_score = self.score.load(Ordering::Relaxed);
let new_score = u32::min(
match event {
Event::ResponseOk => current_score.saturating_add(1),
Event::SlowResponse => current_score.saturating_sub(5),
Event::RequestTimeout => current_score.saturating_sub(10),
Event::ConnectionFailed | Event::StaleChain => 0,
Event::ConnectionSuccessful => 100,
},
MAX_SCORE,
);
self.score.store(new_score, Ordering::Relaxed);
}

pub fn on_error(&self, err: &jsonrpsee::core::Error) {
match err {
jsonrpsee::core::Error::RequestTimeout => {
self.update(Event::RequestTimeout);
}
jsonrpsee::core::Error::Transport(_)
| jsonrpsee::core::Error::RestartNeeded(_)
| jsonrpsee::core::Error::MaxSlotsExceeded => {
self.update(Event::ConnectionFailed);
}
_ => {}
};
}

pub async fn monitor(&self, client_rx_: tokio::sync::watch::Receiver<Option<Arc<Client>>>) {
if self.config.health_method.is_empty() {
return;
}

let method_name = self.config.health_method.as_str();
let interval = Duration::from_secs(self.config.interval_sec);
let response_threshold = Duration::from_micros(self.config.response_threshold_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => return,
};

// Check if the endpoint has the health method
match client
.request::<serde_json::Value, Vec<serde_json::Value>>("rpc_methods", vec![])
.await
{
Ok(response) => {
let has_health_method = response
.get("methods")
.unwrap_or(&serde_json::json!([]))
.as_array()
.map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name)))
.unwrap_or_default();
if !has_health_method {
tracing::warn!(
"Endpoint {url} does not have the {method_name:?} method",
url = self.url
);
return;
}
}
Err(_) => return,
};

loop {
tokio::time::sleep(interval).await;

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => continue,
};

let request_start = std::time::Instant::now();
match client
.request::<serde_json::Value, Vec<serde_json::Value>>(method_name, vec![])
.await
{
Ok(response) => {
let duration = request_start.elapsed();

// Check if the node is syncing
if response
.get("isSyncing")
.unwrap_or(&serde_json::json!(false))
.as_bool()
.unwrap_or_default()
{
self.update(Event::StaleChain);
continue;
}

// Check response time
if duration > response_threshold {
self.update(Event::SlowResponse);
continue;
}

self.update(Event::ResponseOk);
}
Err(err) => {
self.on_error(&err);
}
}
}
}
}
1 change: 1 addition & 0 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
};

mod endpoint;
mod health;
use endpoint::Endpoint;

#[cfg(test)]
Expand Down

0 comments on commit 0b3a1ee

Please sign in to comment.