diff --git a/Cargo.lock b/Cargo.lock index 08d2d55cb6..62096282aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4714,8 +4714,10 @@ dependencies = [ "log", "nix 0.28.0", "pretty_assertions", + "prometheus-parse", "rand 0.8.5", "ratatui", + "reqwest 0.12.4", "serde", "serde_json", "signal-hook", @@ -5556,6 +5558,18 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "prometheus-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "811031bea65e5a401fb2e1f37d802cca6601e204ac463809a3189352d13b78a5" +dependencies = [ + "chrono", + "itertools 0.12.1", + "once_cell", + "regex", +] + [[package]] name = "proptest" version = "1.4.0" diff --git a/node-launchpad/Cargo.toml b/node-launchpad/Cargo.toml index ab31f02066..d0865cb826 100644 --- a/node-launchpad/Cargo.toml +++ b/node-launchpad/Cargo.toml @@ -42,8 +42,12 @@ libc = "0.2.148" log = "0.4.20" nix = { version = "0.28.0", features = ["user"] } pretty_assertions = "1.4.0" +prometheus-parse = "0.2.5" rand = "0.8.5" ratatui = { version = "0.26.0", features = ["serde", "macros", "unstable-widget-ref"] } +reqwest = { version = "0.12.2", default-features = false, features = [ + "rustls-tls-manual-roots", +] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" signal-hook = "0.3.17" diff --git a/node-launchpad/src/action.rs b/node-launchpad/src/action.rs index 4bc6fc48eb..2b63773da4 100644 --- a/node-launchpad/src/action.rs +++ b/node-launchpad/src/action.rs @@ -6,7 +6,10 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::mode::{InputMode, Scene}; +use crate::{ + mode::{InputMode, Scene}, + node_stats::NodeStats, +}; use serde::{Deserialize, Serialize}; use strum::Display; @@ -41,10 +44,7 @@ pub enum HomeActions { SuccessfullyDetectedNatStatus, ErrorWhileRunningNatDetection, - NodesStatsObtained { - wallet_balance: u64, - space_used: u64, - }, + NodesStatsObtained(NodeStats), TriggerBetaProgramme, TriggerManageNodes, diff --git a/node-launchpad/src/components/home.rs b/node-launchpad/src/components/home.rs index 95dd82c861..1039c184df 100644 --- a/node-launchpad/src/components/home.rs +++ b/node-launchpad/src/components/home.rs @@ -15,21 +15,16 @@ use crate::{ action::{Action, HomeActions}, config::Config, mode::{InputMode, Scene}, + node_stats::NodeStats, style::{clear_area, COOL_GREY, EUCALYPTUS, GHOST_WHITE, LIGHT_PERIWINKLE, VERY_LIGHT_AZURE}, }; use color_eyre::eyre::{OptionExt, Result}; -use fs_extra::dir::get_size; -use futures::StreamExt; use rand::seq::SliceRandom; use ratatui::{prelude::*, widgets::*}; use sn_node_manager::{config::get_node_registry_path, VerbosityLevel}; use sn_peers_acquisition::{get_bootstrap_peers_from_url, PeersArgs}; -use sn_service_management::{ - rpc::{RpcActions, RpcClient}, - NodeRegistry, NodeServiceData, ServiceStatus, -}; +use sn_service_management::{NodeRegistry, NodeServiceData, ServiceStatus}; use std::{ - net::SocketAddr, path::PathBuf, time::{Duration, Instant}, }; @@ -51,7 +46,8 @@ pub struct Home { node_services: Vec, is_nat_status_determined: bool, error_while_running_nat_detection: usize, - node_stats: NodesStats, + node_stats: NodeStats, + node_stats_last_update: Instant, node_table_state: TableState, nodes_to_start: usize, discord_username: String, @@ -85,7 +81,8 @@ impl Home { node_services: Default::default(), is_nat_status_determined: false, error_while_running_nat_detection: 0, - node_stats: NodesStats::new(), + node_stats: NodeStats::default(), + node_stats_last_update: Instant::now(), nodes_to_start: allocated_disk_space, node_table_state: Default::default(), lock_registry: None, @@ -100,10 +97,10 @@ impl Home { /// Tries to trigger the update of node stats if the last update was more than `NODE_STAT_UPDATE_INTERVAL` ago. /// The result is sent via the HomeActions::NodesStatsObtained action. fn try_update_node_stats(&mut self, force_update: bool) -> Result<()> { - if self.node_stats.last_update.elapsed() > NODE_STAT_UPDATE_INTERVAL || force_update { - self.node_stats.last_update = Instant::now(); + if self.node_stats_last_update.elapsed() > NODE_STAT_UPDATE_INTERVAL || force_update { + self.node_stats_last_update = Instant::now(); - NodesStats::fetch_all_node_stats(&self.node_services, self.get_actions_sender()?); + NodeStats::fetch_all_node_stats(&self.node_services, self.get_actions_sender()?); } Ok(()) } @@ -282,12 +279,8 @@ impl Component for Home { Action::Tick => { self.try_update_node_stats(false)?; } - Action::HomeActions(HomeActions::NodesStatsObtained { - wallet_balance, - space_used, - }) => { - self.node_stats.wallet_balance = wallet_balance; - self.node_stats.space_used = space_used; + Action::HomeActions(HomeActions::NodesStatsObtained(stats)) => { + self.node_stats = stats; } Action::HomeActions(HomeActions::StartNodesCompleted) | Action::HomeActions(HomeActions::StopNodesCompleted) => { @@ -425,14 +418,14 @@ impl Component for Home { let stats_rows = vec![Row::new(vec![ self.node_stats.wallet_balance.to_string(), space_used_value, - // self.node_stats.memory_usage.to_string(), - // self.node_stats.network_usage.to_string(), + self.node_stats.memory_usage_mb.to_string(), + self.node_stats.network_usage.to_string(), ])]; let stats_width = [ Constraint::Min(15), Constraint::Min(10), - // Constraint::Min(10), - // Constraint::Min(10), + Constraint::Min(10), + Constraint::Min(10), ]; let stats_table = Table::new(stats_rows, stats_width) .column_spacing(2) @@ -440,8 +433,8 @@ impl Component for Home { Row::new(vec![ "Wallet Balance", space_used_header.as_str(), - // "Memory usage", - // "Network Usage", + "Memory usage (MB)", + "Network Usage", ]) .style(Style::new().bold().fg(GHOST_WHITE)), ) @@ -454,7 +447,6 @@ impl Component for Home { .style(Style::default().fg(VERY_LIGHT_AZURE)), ); f.render_widget(stats_table, layer_zero[1]); - // "todo: display a table".to_string() }; // ==== Node Status ===== @@ -671,95 +663,3 @@ fn reset_nodes(action_sender: UnboundedSender) { } }); } - -/// The stats of all the running nodes -/// todo: certain stats like wallet balance, space used can be calculated even if the node is offline. -struct NodesStats { - pub wallet_balance: u64, - pub space_used: u64, - // pub memory_usage: usize, - // pub network_usage: usize, - pub last_update: Instant, -} - -impl NodesStats { - pub fn new() -> Self { - Self { - wallet_balance: 0, - space_used: 0, - // memory_usage: 0, - // network_usage: 0, - last_update: Instant::now(), - } - } - - pub fn fetch_all_node_stats(nodes: &[NodeServiceData], action_sender: UnboundedSender) { - let node_details = nodes - .iter() - .filter_map(|node| { - if node.status == ServiceStatus::Running { - Some(( - node.service_name.clone(), - node.rpc_socket_addr, - node.data_dir_path.clone(), - )) - } else { - None - } - }) - .collect::>(); - - tokio::task::spawn_local(async move { - Self::fetch_all_node_stats_inner(node_details, action_sender).await; - }); - } - - async fn fetch_all_node_stats_inner( - node_details: Vec<(String, SocketAddr, PathBuf)>, - action_sender: UnboundedSender, - ) { - let mut stream = futures::stream::iter(node_details) - .map(|(service_name, rpc_addr, data_dir)| async move { - ( - Self::fetch_stat_per_node(rpc_addr, data_dir).await, - service_name, - ) - }) - .buffer_unordered(5); - - let mut all_wallet_balance = 0; - let mut all_space_used = 0; - - while let Some((result, service_name)) = stream.next().await { - match result { - Ok((wallet_balance, space_used)) => { - info!("Wallet balance: {wallet_balance}, Space used: {space_used}"); - all_wallet_balance += wallet_balance; - all_space_used += space_used; - } - Err(err) => { - error!("Error while fetching stats from {service_name:?}: {err:?}"); - } - } - } - - if let Err(err) = action_sender.send(Action::HomeActions(HomeActions::NodesStatsObtained { - wallet_balance: all_wallet_balance, - space_used: all_space_used, - })) { - error!("Error while sending action: {err:?}"); - } - } - - // todo: get all the stats - async fn fetch_stat_per_node(rpc_addr: SocketAddr, data_dir: PathBuf) -> Result<(u64, u64)> { - let now = Instant::now(); - let rpc_client = RpcClient::from_socket_addr(rpc_addr); - let wallet_balance = rpc_client.node_info().await?.wallet_balance; - - let space_used = get_size(data_dir)?; - - debug!("Fetched stats from {rpc_addr:?} in {:?}", now.elapsed()); - Ok((wallet_balance, space_used)) - } -} diff --git a/node-launchpad/src/lib.rs b/node-launchpad/src/lib.rs index 98ede28787..8e9ba7b8d4 100644 --- a/node-launchpad/src/lib.rs +++ b/node-launchpad/src/lib.rs @@ -11,6 +11,7 @@ pub mod app; pub mod components; pub mod config; pub mod mode; +pub mod node_stats; pub mod style; pub mod tui; pub mod utils; diff --git a/node-launchpad/src/node_stats.rs b/node-launchpad/src/node_stats.rs new file mode 100644 index 0000000000..52e205cbf9 --- /dev/null +++ b/node-launchpad/src/node_stats.rs @@ -0,0 +1,159 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use color_eyre::Result; +use fs_extra::dir::get_size; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use sn_service_management::{NodeServiceData, ServiceStatus}; +use std::{path::PathBuf, time::Instant}; +use tokio::sync::mpsc::UnboundedSender; + +use crate::action::{Action, HomeActions}; + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeStats { + pub wallet_balance: u64, + pub forwarded_rewards: u64, + pub space_used: u64, + pub memory_usage_mb: usize, + pub network_usage: usize, +} + +impl NodeStats { + fn merge(&mut self, other: &NodeStats) { + self.wallet_balance += other.wallet_balance; + self.forwarded_rewards += other.forwarded_rewards; + self.space_used += other.space_used; + self.memory_usage_mb += other.memory_usage_mb; + self.network_usage += other.network_usage; + } + + pub fn fetch_all_node_stats(nodes: &[NodeServiceData], action_sender: UnboundedSender) { + let node_details = nodes + .iter() + .filter_map(|node| { + if node.status == ServiceStatus::Running { + if let Some(metrics_port) = node.metrics_port { + Some(( + node.service_name.clone(), + metrics_port, + node.data_dir_path.clone(), + )) + } else { + error!( + "No metrics port found for {:?}. Skipping stat fetch.", + node.service_name + ); + None + } + } else { + None + } + }) + .collect::>(); + if node_details.is_empty() { + info!("No running nodes to fetch stats from."); + return; + } else { + info!("Fetching stats from {} nodes", node_details.len()); + tokio::task::spawn_local(async move { + Self::fetch_all_node_stats_inner(node_details, action_sender).await; + }); + } + } + + async fn fetch_all_node_stats_inner( + node_details: Vec<(String, u16, PathBuf)>, + action_sender: UnboundedSender, + ) { + let mut stream = futures::stream::iter(node_details) + .map(|(service_name, metrics_port, data_dir)| async move { + ( + Self::fetch_stat_per_node(metrics_port, data_dir).await, + service_name, + ) + }) + .buffer_unordered(5); + + let mut all_node_stats = NodeStats::default(); + + while let Some((result, service_name)) = stream.next().await { + match result { + Ok(stats) => { + info!("Obtained node stats from {service_name:?}"); + all_node_stats.merge(&stats); + } + Err(err) => { + error!("Error while fetching stats from {service_name:?}: {err:?}"); + } + } + } + + if let Err(err) = action_sender.send(Action::HomeActions(HomeActions::NodesStatsObtained( + all_node_stats, + ))) { + error!("Error while sending action: {err:?}"); + } + } + + async fn fetch_stat_per_node(metrics_port: u16, data_dir: PathBuf) -> Result { + let now = Instant::now(); + + let body = reqwest::get(&format!("http://localhost:{metrics_port}/metrics")) + .await? + .text() + .await?; + let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect(); + let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?; + + let mut stats = NodeStats { + wallet_balance: 0, + space_used: get_size(data_dir)?, + memory_usage_mb: 0, + network_usage: 0, + forwarded_rewards: 0, + }; + for sample in all_metrics.samples.iter() { + if sample.metric == "sn_node_current_reward_wallet_balance" { + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + stats.wallet_balance = val as u64; + } + _ => {} + } + } else if sample.metric == "sn_networking_process_memory_used_mb" { + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + stats.memory_usage_mb = val as usize; + } + _ => {} + } + } else if sample.metric == "sn_node_total_forwarded_rewards" { + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + stats.forwarded_rewards = val as u64; + } + _ => {} + } + } + + debug!( + "Fetched stats from metrics_port {metrics_port:?} in {:?}", + now.elapsed() + ); + } + Ok(stats) + } +}