Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Mar 7, 2024
1 parent 0b3a1ee commit 558d7a5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 69 deletions.
9 changes: 1 addition & 8 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,7 @@ impl Endpoint {
});

// This task will check the health of the endpoint and update the health score
let health_ = health.clone();
let on_client_ready_ = on_client_ready.clone();
let client_rx_ = client_rx.clone();
let health_checker = tokio::spawn(async move {
// Wait for the client to be ready before starting the health check
on_client_ready_.notified().await;
health_.monitor(client_rx_).await;
});
let health_checker = Health::monitor(health.clone(), client_rx.clone(), on_client_ready.clone());

Self {
url,
Expand Down
133 changes: 72 additions & 61 deletions src/extensions/client/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,83 +67,94 @@ impl Health {
_ => {}
};
}
}

pub async fn monitor(&self, client_rx_: tokio::sync::watch::Receiver<Option<Arc<Client>>>) {
if self.config.health_method.is_empty() {
return;
}

let method_name = self.config.health_method.as_str();
let interval = Duration::from_secs(self.config.interval_sec);
let response_threshold = Duration::from_micros(self.config.response_threshold_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => return,
};

// Check if the endpoint has the health method
match client
.request::<serde_json::Value, Vec<serde_json::Value>>("rpc_methods", vec![])
.await
{
Ok(response) => {
let has_health_method = response
.get("methods")
.unwrap_or(&serde_json::json!([]))
.as_array()
.map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name)))
.unwrap_or_default();
if !has_health_method {
tracing::warn!(
"Endpoint {url} does not have the {method_name:?} method",
url = self.url
);
return;
}
impl Health {
pub fn monitor(
health: Arc<Health>,
client_rx_: tokio::sync::watch::Receiver<Option<Arc<Client>>>,
on_client_ready: Arc<tokio::sync::Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
if health.config.health_method.is_empty() {
return;
}
Err(_) => return,
};

loop {
tokio::time::sleep(interval).await;
// Wait for the client to be ready before starting the health check
on_client_ready.notified().await;

let method_name = health.config.health_method.as_str();
let interval = Duration::from_secs(health.config.interval_sec);
let response_threshold = Duration::from_micros(health.config.response_threshold_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => continue,
None => return,
};

let request_start = std::time::Instant::now();
// Check if the endpoint has the health method
match client
.request::<serde_json::Value, Vec<serde_json::Value>>(method_name, vec![])
.request::<serde_json::Value, Vec<serde_json::Value>>("rpc_methods", vec![])
.await
{
Ok(response) => {
let duration = request_start.elapsed();

// Check if the node is syncing
if response
.get("isSyncing")
.unwrap_or(&serde_json::json!(false))
.as_bool()
.unwrap_or_default()
{
self.update(Event::StaleChain);
continue;
let has_health_method = response
.get("methods")
.unwrap_or(&serde_json::json!([]))
.as_array()
.map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name)))
.unwrap_or_default();
if !has_health_method {
tracing::warn!(
"Endpoint {url} does not have the {method_name:?} method",
url = health.url
);
return;
}
}
Err(_) => return,
};

// Check response time
if duration > response_threshold {
self.update(Event::SlowResponse);
continue;
loop {
tokio::time::sleep(interval).await;

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => continue,
};

let request_start = std::time::Instant::now();
match client
.request::<serde_json::Value, Vec<serde_json::Value>>(method_name, vec![])
.await
{
Ok(response) => {
let duration = request_start.elapsed();

// Check if the node is syncing
if response
.get("isSyncing")
.unwrap_or(&serde_json::json!(false))
.as_bool()
.unwrap_or_default()
{
health.update(Event::StaleChain);
continue;
}

// Check response time
if duration > response_threshold {
health.update(Event::SlowResponse);
continue;
}

health.update(Event::ResponseOk);
}
Err(err) => {
health.on_error(&err);
}

self.update(Event::ResponseOk);
}
Err(err) => {
self.on_error(&err);
}
}
}
})
}
}

0 comments on commit 558d7a5

Please sign in to comment.