Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(launchpad): refresh node states on startup #1873

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions node-launchpad/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct App {
}

impl App {
pub fn new(
pub async fn new(
tick_rate: f64,
frame_rate: f64,
peers_args: PeersArgs,
Expand All @@ -52,7 +52,8 @@ impl App {
&app_data.discord_username,
peers_args,
safenode_path,
)?;
)
.await?;
let config = Config::new()?;
let discord_username_input = BetaProgramme::new(app_data.discord_username.clone());
let manage_nodes = ManageNodes::new(app_data.nodes_to_start)?;
Expand Down
3 changes: 2 additions & 1 deletion node-launchpad/src/bin/tui/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ async fn tokio_main() -> Result<()> {
args.frame_rate,
args.peers,
args.safenode_path,
)?;
)
.await?;
app.run().await?;

Ok(())
Expand Down
14 changes: 12 additions & 2 deletions node-launchpad/src/components/home.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use rand::seq::SliceRandom;
use ratatui::{prelude::*, widgets::*};
use sn_node_manager::{config::get_node_registry_path, VerbosityLevel};
use sn_peers_acquisition::{get_bootstrap_peers_from_url, PeersArgs};
use sn_service_management::{NodeRegistry, NodeServiceData, ServiceStatus};
use sn_service_management::{
control::ServiceController, NodeRegistry, NodeServiceData, ServiceStatus,
};
use std::{
path::PathBuf,
time::{Duration, Instant},
Expand Down Expand Up @@ -67,7 +69,7 @@ pub enum LockRegistryState {
}

impl Home {
pub fn new(
pub async fn new(
allocated_disk_space: usize,
discord_username: &str,
peers_args: PeersArgs,
Expand All @@ -89,6 +91,14 @@ impl Home {
discord_username: discord_username.to_string(),
safenode_path,
};

let now = Instant::now();
debug!("Refreshing node registry states on startup");
let mut node_registry = NodeRegistry::load(&get_node_registry_path()?)?;
sn_node_manager::refresh_node_registry(&mut node_registry, &ServiceController {}, false)
.await?;
node_registry.save()?;
debug!("Node registry states refreshed in {:?}", now.elapsed());
home.load_node_registry_and_update_states()?;

Ok(home)
Expand Down
1 change: 1 addition & 0 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ fn main() -> Result<()> {
#[cfg(feature = "open-metrics")]
let mut node_builder = node_builder;
// if enable flag is provided or only if the port is specified then enable the server by setting Some()
#[cfg(feature = "open-metrics")]
let metrics_server_port = if opt.enable_metrics_server || opt.metrics_server_port != 0 {
Some(opt.metrics_server_port)
} else {
Expand Down
93 changes: 52 additions & 41 deletions sn_node_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ use crate::error::{Error, Result};
use colored::Colorize;
use semver::Version;
use sn_service_management::{
control::ServiceControl,
error::Error as ServiceError,
rpc::{RpcActions, RpcClient},
NodeRegistry, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions,
control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry,
NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions,
UpgradeResult,
};
use sn_transfers::HotWallet;
Expand Down Expand Up @@ -533,61 +531,74 @@ pub async fn refresh_node_registry(
Err(_) => node.reward_balance = None,
}

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
if let ServiceStatus::Running = node.status {
if let Some(pid) = node.pid {
let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
rpc_client.set_max_attempts(1);
let service = NodeService::new(node, Box::new(rpc_client));
refresh_single_node(service, service_control).await?;
}
Ok(())
}

async fn refresh_single_node<'a>(
mut service: NodeService<'a>,
service_control: &dyn ServiceControl,
) -> Result<()> {
match &service.service_data.status {
ServiceStatus::Running => {
if let Some(pid) = service.service_data.pid {
// First we can try the PID we have now. If there is still a process running with
// that PID, we know the node is still running.
if service_control.is_service_process_running(pid) {
debug!(
"The process for node {} is still running at PID {pid}",
node.service_name
"The process for node {} is still running at PID {pid}. Calling on_start() for the service.",
service.service_data.service_name
);
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
}
service.on_start().await?;
} else {
// The process with the PID we had has died at some point. However, if the
// service has been configured to restart on failures, it's possible that a new
// process has been launched and hence we would have a new PID. We can use the
// RPC service to try and retrieve it.
debug!(
"The process for node {} has died. Attempting to retrieve new PID",
node.service_name
service.service_data.service_name
);
match rpc_client.node_info().await {
Ok(info) => {
node.pid = Some(info.pid);
debug!("New PID for node {} is= {:?}", node.service_name, node.pid);
}
Err(_) => {
// Finally, if there was an error communicating with the RPC client, we
// can assume that this node is actually stopped.
debug!(
"Failed to retrieve new PID for node {}. Assuming it's stopped",
node.service_name
);
node.status = ServiceStatus::Stopped;
node.pid = None;
}
}
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
if service.rpc_actions.node_info().await.is_ok() {
debug!(
"New PID for node {} is= {:?}. Calling on_start() for the service",
service.service_data.service_name, service.service_data.pid
);
service.on_start().await?;
} else {
// Finally, if there was an error communicating with the RPC client, we
// can assume that this node is actually stopped.
debug!(
"Failed to retrieve new PID for node {}. Calling on_stop() for the service.",
service.service_data.service_name
);
service.on_stop().await?;
}
}
}
}
// If the service was manually started by an user, we'd want to make sure that we sync our state.
status => {
if service.rpc_actions.node_info().await.is_ok() {
debug!(
"New PID for {} node (status: {status:?}) is= {:?}. Calling on_start() for the service",
service.service_data.service_name, service.service_data.pid
);
service.on_start().await?;
} else {
// If there was an error, we can assume that the node is at the same state.
debug!(
"Failed to retrieve new PID for node {}. Not changing state",
service.service_data.service_name
);
}
}
}

Ok(())
}

Expand Down
6 changes: 2 additions & 4 deletions sn_service_management/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ impl ServiceControl for ServiceController {
}

fn is_service_process_running(&self, pid: u32) -> bool {
let mut system = System::new_all();
system.refresh_all();
let system = System::new_all();
system.process(Pid::from(pid as usize)).is_some()
}

Expand All @@ -185,8 +184,7 @@ impl ServiceControl for ServiceController {
}

fn get_process_pid(&self, bin_path: &Path) -> Result<u32> {
let mut system = System::new_all();
system.refresh_all();
let system = System::new_all();
for (pid, process) in system.processes() {
if let Some(path) = process.exe() {
if bin_path == path {
Expand Down
23 changes: 18 additions & 5 deletions sn_service_management/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub trait RpcActions: Sync {

pub struct RpcClient {
endpoint: String,
max_attempts: u8,
retry_delay: Duration,
}

impl RpcClient {
Expand All @@ -62,12 +64,24 @@ impl RpcClient {
pub fn new(endpoint: &str) -> Self {
Self {
endpoint: endpoint.to_string(),
max_attempts: Self::MAX_CONNECTION_RETRY_ATTEMPTS,
retry_delay: Self::CONNECTION_RETRY_DELAY_SEC,
}
}

pub fn from_socket_addr(socket: SocketAddr) -> Self {
let endpoint = format!("https://{socket}");
Self { endpoint }
Self::new(&endpoint)
}

/// Set the maximum number of retry attempts when connecting to the RPC endpoint. Default is 5.
pub fn set_max_attempts(&mut self, max_retry_attempts: u8) {
self.max_attempts = max_retry_attempts;
}

/// Set the delay between retry attempts when connecting to the RPC endpoint. Default is 1 second.
pub fn set_retry_delay(&mut self, retry_delay: Duration) {
self.retry_delay = retry_delay;
}

// Connect to the RPC endpoint with retry
Expand All @@ -78,14 +92,13 @@ impl RpcClient {
Ok(rpc_client) => break Ok(rpc_client),
Err(_) => {
attempts += 1;
tokio::time::sleep(Self::CONNECTION_RETRY_DELAY_SEC).await;
if attempts >= Self::MAX_CONNECTION_RETRY_ATTEMPTS {
tokio::time::sleep(self.retry_delay).await;
if attempts >= self.max_attempts {
return Err(Error::RpcConnectionError(self.endpoint.clone()));
}
error!(
"Could not connect to RPC endpoint {:?}. Retrying {attempts}/{}",
self.endpoint,
Self::MAX_CONNECTION_RETRY_ATTEMPTS
self.endpoint, self.max_attempts
);
}
}
Expand Down
Loading