From 30a7e15791da51e2b53f25940dc654748a5277f9 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Tue, 11 Jun 2024 22:34:30 +0530 Subject: [PATCH 1/4] fix(manager): refresh should also try to refresh non-running nodes --- node-launchpad/src/components/home.rs | 14 ++++ sn_node_manager/src/lib.rs | 93 +++++++++++++++------------ sn_service_management/src/rpc.rs | 23 +++++-- 3 files changed, 84 insertions(+), 46 deletions(-) diff --git a/node-launchpad/src/components/home.rs b/node-launchpad/src/components/home.rs index 3f8b4be8ae..e703704175 100644 --- a/node-launchpad/src/components/home.rs +++ b/node-launchpad/src/components/home.rs @@ -573,6 +573,20 @@ impl Component for Home { } } +fn refresh_node_registry() -> Result<()> { + let node_registry = NodeRegistry::load(&get_node_registry_path()?)?; + let node_services = node_registry + .nodes + .into_iter() + .filter(|node| node.status != ServiceStatus::Removed) + .collect(); + info!( + "Loaded node registry. Running nodes: {:?}", + node_services.len() + ); + Ok(()) +} + fn stop_nodes(services: Vec, action_sender: UnboundedSender) { tokio::task::spawn_local(async move { if let Err(err) = diff --git a/sn_node_manager/src/lib.rs b/sn_node_manager/src/lib.rs index e8fe06ed66..e5b02521ef 100644 --- a/sn_node_manager/src/lib.rs +++ b/sn_node_manager/src/lib.rs @@ -40,10 +40,8 @@ use crate::error::{Error, Result}; use colored::Colorize; use semver::Version; use sn_service_management::{ - control::ServiceControl, - error::Error as ServiceError, - rpc::{RpcActions, RpcClient}, - NodeRegistry, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions, + control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry, + NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult, }; use sn_transfers::HotWallet; @@ -533,24 +531,29 @@ pub async fn refresh_node_registry( Err(_) => node.reward_balance = None, } - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - if let ServiceStatus::Running = node.status { - if let Some(pid) = node.pid { + let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); + rpc_client.set_max_attempts(1); + let service = NodeService::new(node, Box::new(rpc_client)); + refresh_single_node(service, service_control).await?; + } + Ok(()) +} + +async fn refresh_single_node<'a>( + mut service: NodeService<'a>, + service_control: &dyn ServiceControl, +) -> Result<()> { + match &service.service_data.status { + ServiceStatus::Running => { + if let Some(pid) = service.service_data.pid { // First we can try the PID we have now. If there is still a process running with // that PID, we know the node is still running. if service_control.is_service_process_running(pid) { debug!( - "The process for node {} is still running at PID {pid}", - node.service_name + "The process for node {} is still running at PID {pid}. Calling on_start() for the service.", + service.service_data.service_name ); - match rpc_client.network_info().await { - Ok(info) => { - node.connected_peers = Some(info.connected_peers); - } - Err(_) => { - node.connected_peers = None; - } - } + service.on_start().await?; } else { // The process with the PID we had has died at some point. However, if the // service has been configured to restart on failures, it's possible that a new @@ -558,36 +561,44 @@ pub async fn refresh_node_registry( // RPC service to try and retrieve it. debug!( "The process for node {} has died. Attempting to retrieve new PID", - node.service_name + service.service_data.service_name ); - match rpc_client.node_info().await { - Ok(info) => { - node.pid = Some(info.pid); - debug!("New PID for node {} is= {:?}", node.service_name, node.pid); - } - Err(_) => { - // Finally, if there was an error communicating with the RPC client, we - // can assume that this node is actually stopped. - debug!( - "Failed to retrieve new PID for node {}. Assuming it's stopped", - node.service_name - ); - node.status = ServiceStatus::Stopped; - node.pid = None; - } - } - match rpc_client.network_info().await { - Ok(info) => { - node.connected_peers = Some(info.connected_peers); - } - Err(_) => { - node.connected_peers = None; - } + if service.rpc_actions.node_info().await.is_ok() { + debug!( + "New PID for node {} is= {:?}. Calling on_start() for the service", + service.service_data.service_name, service.service_data.pid + ); + service.on_start().await?; + } else { + // Finally, if there was an error communicating with the RPC client, we + // can assume that this node is actually stopped. + debug!( + "Failed to retrieve new PID for node {}. Calling on_stop() for the service.", + service.service_data.service_name + ); + service.on_stop().await?; } } } } + // If the service was manually started by an user, we'd want to make sure that we sync our state. + status => { + if service.rpc_actions.node_info().await.is_ok() { + debug!( + "New PID for {} node (status: {status:?}) is= {:?}. Calling on_start() for the service", + service.service_data.service_name, service.service_data.pid + ); + service.on_start().await?; + } else { + // If there was an error, we can assume that the node is at the same state. + debug!( + "Failed to retrieve new PID for node {}. Not changing state", + service.service_data.service_name + ); + } + } } + Ok(()) } diff --git a/sn_service_management/src/rpc.rs b/sn_service_management/src/rpc.rs index 5c6b2eda0f..be3c7d6b30 100644 --- a/sn_service_management/src/rpc.rs +++ b/sn_service_management/src/rpc.rs @@ -53,6 +53,8 @@ pub trait RpcActions: Sync { pub struct RpcClient { endpoint: String, + max_attempts: u8, + retry_delay: Duration, } impl RpcClient { @@ -62,12 +64,24 @@ impl RpcClient { pub fn new(endpoint: &str) -> Self { Self { endpoint: endpoint.to_string(), + max_attempts: Self::MAX_CONNECTION_RETRY_ATTEMPTS, + retry_delay: Self::CONNECTION_RETRY_DELAY_SEC, } } pub fn from_socket_addr(socket: SocketAddr) -> Self { let endpoint = format!("https://{socket}"); - Self { endpoint } + Self::new(&endpoint) + } + + /// Set the maximum number of retry attempts when connecting to the RPC endpoint. Default is 5. + pub fn set_max_attempts(&mut self, max_retry_attempts: u8) { + self.max_attempts = max_retry_attempts; + } + + /// Set the delay between retry attempts when connecting to the RPC endpoint. Default is 1 second. + pub fn set_retry_delay(&mut self, retry_delay: Duration) { + self.retry_delay = retry_delay; } // Connect to the RPC endpoint with retry @@ -78,14 +92,13 @@ impl RpcClient { Ok(rpc_client) => break Ok(rpc_client), Err(_) => { attempts += 1; - tokio::time::sleep(Self::CONNECTION_RETRY_DELAY_SEC).await; - if attempts >= Self::MAX_CONNECTION_RETRY_ATTEMPTS { + tokio::time::sleep(self.retry_delay).await; + if attempts >= self.max_attempts { return Err(Error::RpcConnectionError(self.endpoint.clone())); } error!( "Could not connect to RPC endpoint {:?}. Retrying {attempts}/{}", - self.endpoint, - Self::MAX_CONNECTION_RETRY_ATTEMPTS + self.endpoint, self.max_attempts ); } } From 65c9037d782cf94f3923843eb8ee5a87b19bbc08 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Tue, 11 Jun 2024 22:40:38 +0530 Subject: [PATCH 2/4] chore(manager): do not refresh system info twice --- sn_service_management/src/control.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sn_service_management/src/control.rs b/sn_service_management/src/control.rs index 29e1f07689..b7ea1868e4 100644 --- a/sn_service_management/src/control.rs +++ b/sn_service_management/src/control.rs @@ -168,8 +168,7 @@ impl ServiceControl for ServiceController { } fn is_service_process_running(&self, pid: u32) -> bool { - let mut system = System::new_all(); - system.refresh_all(); + let system = System::new_all(); system.process(Pid::from(pid as usize)).is_some() } @@ -185,8 +184,7 @@ impl ServiceControl for ServiceController { } fn get_process_pid(&self, bin_path: &Path) -> Result { - let mut system = System::new_all(); - system.refresh_all(); + let system = System::new_all(); for (pid, process) in system.processes() { if let Some(path) = process.exe() { if bin_path == path { From dc78cff3667aaf1a01ee403a4fdd22d07b0872a5 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Tue, 11 Jun 2024 23:01:01 +0530 Subject: [PATCH 3/4] feat(launchpad): refresh node states on startup --- node-launchpad/src/app.rs | 5 +++-- node-launchpad/src/bin/tui/main.rs | 3 ++- node-launchpad/src/components/home.rs | 28 ++++++++++++--------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/node-launchpad/src/app.rs b/node-launchpad/src/app.rs index 924c7e8e06..6a3e7e9cd7 100644 --- a/node-launchpad/src/app.rs +++ b/node-launchpad/src/app.rs @@ -39,7 +39,7 @@ pub struct App { } impl App { - pub fn new( + pub async fn new( tick_rate: f64, frame_rate: f64, peers_args: PeersArgs, @@ -52,7 +52,8 @@ impl App { &app_data.discord_username, peers_args, safenode_path, - )?; + ) + .await?; let config = Config::new()?; let discord_username_input = BetaProgramme::new(app_data.discord_username.clone()); let manage_nodes = ManageNodes::new(app_data.nodes_to_start)?; diff --git a/node-launchpad/src/bin/tui/main.rs b/node-launchpad/src/bin/tui/main.rs index b95b154297..842cabbb6b 100644 --- a/node-launchpad/src/bin/tui/main.rs +++ b/node-launchpad/src/bin/tui/main.rs @@ -65,7 +65,8 @@ async fn tokio_main() -> Result<()> { args.frame_rate, args.peers, args.safenode_path, - )?; + ) + .await?; app.run().await?; Ok(()) diff --git a/node-launchpad/src/components/home.rs b/node-launchpad/src/components/home.rs index e703704175..a02e52a842 100644 --- a/node-launchpad/src/components/home.rs +++ b/node-launchpad/src/components/home.rs @@ -22,7 +22,9 @@ use rand::seq::SliceRandom; use ratatui::{prelude::*, widgets::*}; use sn_node_manager::{config::get_node_registry_path, VerbosityLevel}; use sn_peers_acquisition::{get_bootstrap_peers_from_url, PeersArgs}; -use sn_service_management::{NodeRegistry, NodeServiceData, ServiceStatus}; +use sn_service_management::{ + control::ServiceController, NodeRegistry, NodeServiceData, ServiceStatus, +}; use std::{ path::PathBuf, time::{Duration, Instant}, @@ -67,7 +69,7 @@ pub enum LockRegistryState { } impl Home { - pub fn new( + pub async fn new( allocated_disk_space: usize, discord_username: &str, peers_args: PeersArgs, @@ -89,6 +91,14 @@ impl Home { discord_username: discord_username.to_string(), safenode_path, }; + + let now = Instant::now(); + debug!("Refreshing node registry states on startup"); + let mut node_registry = NodeRegistry::load(&get_node_registry_path()?)?; + sn_node_manager::refresh_node_registry(&mut node_registry, &ServiceController {}, false) + .await?; + node_registry.save()?; + debug!("Node registry states refreshed in {:?}", now.elapsed()); home.load_node_registry_and_update_states()?; Ok(home) @@ -573,20 +583,6 @@ impl Component for Home { } } -fn refresh_node_registry() -> Result<()> { - let node_registry = NodeRegistry::load(&get_node_registry_path()?)?; - let node_services = node_registry - .nodes - .into_iter() - .filter(|node| node.status != ServiceStatus::Removed) - .collect(); - info!( - "Loaded node registry. Running nodes: {:?}", - node_services.len() - ); - Ok(()) -} - fn stop_nodes(services: Vec, action_sender: UnboundedSender) { tokio::task::spawn_local(async move { if let Err(err) = From d9e84bd559a8e3588933eaa7b2f439680f7d6b07 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Wed, 12 Jun 2024 00:08:06 +0530 Subject: [PATCH 4/4] chore(node): feature gate a variable --- sn_node/src/bin/safenode/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sn_node/src/bin/safenode/main.rs b/sn_node/src/bin/safenode/main.rs index e3bd77dd20..eaf734380e 100644 --- a/sn_node/src/bin/safenode/main.rs +++ b/sn_node/src/bin/safenode/main.rs @@ -225,6 +225,7 @@ fn main() -> Result<()> { #[cfg(feature = "open-metrics")] let mut node_builder = node_builder; // if enable flag is provided or only if the port is specified then enable the server by setting Some() + #[cfg(feature = "open-metrics")] let metrics_server_port = if opt.enable_metrics_server || opt.metrics_server_port != 0 { Some(opt.metrics_server_port) } else {