From 558d7a5f0b55a4c9069e8ce16273b0b81aaed65d Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Thu, 7 Mar 2024 13:11:53 +0100 Subject: [PATCH] refactor --- src/extensions/client/endpoint.rs | 9 +- src/extensions/client/health.rs | 133 ++++++++++++++++-------------- 2 files changed, 73 insertions(+), 69 deletions(-) diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index e999b3f..45fb552 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -84,14 +84,7 @@ impl Endpoint { }); // This task will check the health of the endpoint and update the health score - let health_ = health.clone(); - let on_client_ready_ = on_client_ready.clone(); - let client_rx_ = client_rx.clone(); - let health_checker = tokio::spawn(async move { - // Wait for the client to be ready before starting the health check - on_client_ready_.notified().await; - health_.monitor(client_rx_).await; - }); + let health_checker = Health::monitor(health.clone(), client_rx.clone(), on_client_ready.clone()); Self { url, diff --git a/src/extensions/client/health.rs b/src/extensions/client/health.rs index 82770af..0655ddc 100644 --- a/src/extensions/client/health.rs +++ b/src/extensions/client/health.rs @@ -67,83 +67,94 @@ impl Health { _ => {} }; } +} - pub async fn monitor(&self, client_rx_: tokio::sync::watch::Receiver>>) { - if self.config.health_method.is_empty() { - return; - } - - let method_name = self.config.health_method.as_str(); - let interval = Duration::from_secs(self.config.interval_sec); - let response_threshold = Duration::from_micros(self.config.response_threshold_ms); - - let client = match client_rx_.borrow().clone() { - Some(client) => client, - None => return, - }; - - // Check if the endpoint has the health method - match client - .request::>("rpc_methods", vec![]) - .await - { - Ok(response) => { - let has_health_method = response - .get("methods") - .unwrap_or(&serde_json::json!([])) - .as_array() - .map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name))) - .unwrap_or_default(); - if !has_health_method { - tracing::warn!( - "Endpoint {url} does not have the {method_name:?} method", - url = self.url - ); - return; - } +impl Health { + pub fn monitor( + health: Arc, + client_rx_: tokio::sync::watch::Receiver>>, + on_client_ready: Arc, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + if health.config.health_method.is_empty() { + return; } - Err(_) => return, - }; - loop { - tokio::time::sleep(interval).await; + // Wait for the client to be ready before starting the health check + on_client_ready.notified().await; + + let method_name = health.config.health_method.as_str(); + let interval = Duration::from_secs(health.config.interval_sec); + let response_threshold = Duration::from_micros(health.config.response_threshold_ms); let client = match client_rx_.borrow().clone() { Some(client) => client, - None => continue, + None => return, }; - let request_start = std::time::Instant::now(); + // Check if the endpoint has the health method match client - .request::>(method_name, vec![]) + .request::>("rpc_methods", vec![]) .await { Ok(response) => { - let duration = request_start.elapsed(); - - // Check if the node is syncing - if response - .get("isSyncing") - .unwrap_or(&serde_json::json!(false)) - .as_bool() - .unwrap_or_default() - { - self.update(Event::StaleChain); - continue; + let has_health_method = response + .get("methods") + .unwrap_or(&serde_json::json!([])) + .as_array() + .map(|methods| methods.iter().any(|x| x.as_str() == Some(method_name))) + .unwrap_or_default(); + if !has_health_method { + tracing::warn!( + "Endpoint {url} does not have the {method_name:?} method", + url = health.url + ); + return; } + } + Err(_) => return, + }; - // Check response time - if duration > response_threshold { - self.update(Event::SlowResponse); - continue; + loop { + tokio::time::sleep(interval).await; + + let client = match client_rx_.borrow().clone() { + Some(client) => client, + None => continue, + }; + + let request_start = std::time::Instant::now(); + match client + .request::>(method_name, vec![]) + .await + { + Ok(response) => { + let duration = request_start.elapsed(); + + // Check if the node is syncing + if response + .get("isSyncing") + .unwrap_or(&serde_json::json!(false)) + .as_bool() + .unwrap_or_default() + { + health.update(Event::StaleChain); + continue; + } + + // Check response time + if duration > response_threshold { + health.update(Event::SlowResponse); + continue; + } + + health.update(Event::ResponseOk); + } + Err(err) => { + health.on_error(&err); } - - self.update(Event::ResponseOk); - } - Err(err) => { - self.on_error(&err); } } - } + }) } }