From 9a494723de484c44de06d117008f3953f938f3d0 Mon Sep 17 00:00:00 2001 From: Anton Yemelyanov Date: Sat, 18 Nov 2023 02:26:37 +0200 Subject: [PATCH] refactor metrics and introduce a dedicated peer monitor --- core/src/core.rs | 5 +- core/src/egui/theme.rs | 8 +- core/src/imports.rs | 8 +- core/src/interop/mod.rs | 25 +++- core/src/interop/runtime.rs | 7 + core/src/interop/services/kaspa/logs.rs | 4 +- core/src/interop/services/kaspa/mod.rs | 167 ++++++++++++------------ core/src/interop/services/metrics.rs | 117 +++++++++-------- core/src/interop/services/mod.rs | 6 + core/src/interop/services/peers.rs | 115 ++++++++++++++++ core/src/modules/logs.rs | 6 +- core/src/modules/metrics.rs | 6 +- core/src/modules/node.rs | 2 +- core/src/modules/overview.rs | 6 +- resources/i18n/i18n.json | 11 +- 15 files changed, 332 insertions(+), 161 deletions(-) create mode 100644 core/src/interop/services/peers.rs diff --git a/core/src/core.rs b/core/src/core.rs index 43ddcc3..7095723 100644 --- a/core/src/core.rs +++ b/core/src/core.rs @@ -780,9 +780,8 @@ impl Core { // let metrics = self.interop.kaspa_service().metrics(); let peers = self .interop - .kaspa_service() - .metrics() - .connected_peer_info() + .peer_monitor_service() + .peer_info() .map(|peers| peers.len()); let tps = self.metrics.as_ref().map(|metrics| metrics.tps); ui.horizontal(|ui| { diff --git a/core/src/egui/theme.rs b/core/src/egui/theme.rs index 8a6b093..af15fdc 100644 --- a/core/src/egui/theme.rs +++ b/core/src/egui/theme.rs @@ -24,8 +24,9 @@ pub struct Theme { pub graph_frame_color: Color32, pub performance_graph_color: Color32, pub storage_graph_color: Color32, - pub node_graph_color: Color32, pub network_graph_color: Color32, + pub blockdag_graph_color: Color32, + pub node_log_font_size: f32, // pub panel_icon_size : f32, // pub panel_icon_padding : f32, } @@ -58,8 +59,9 @@ impl Default for Theme { graph_frame_color: Color32::GRAY, performance_graph_color: Color32::from_rgb(186, 238, 255), storage_graph_color: Color32::from_rgb(255, 231, 186), - node_graph_color: Color32::from_rgb(241, 255, 186), - network_graph_color: Color32::from_rgb(186, 255, 241), + network_graph_color: Color32::from_rgb(241, 255, 186), + blockdag_graph_color: Color32::from_rgb(186, 255, 241), + node_log_font_size: 15_f32, // network_graph_color: Color32::from_rgb(58, 221, 190), // graph_color: Color32::from_rgb(21, 82, 71), // panel_icon_size : IconSize::new(Vec2::splat(26.),Vec2::new(36.,26.)), diff --git a/core/src/imports.rs b/core/src/imports.rs index 5153b0d..1ae5c10 100644 --- a/core/src/imports.rs +++ b/core/src/imports.rs @@ -2,6 +2,7 @@ pub use cfg_if::cfg_if; pub use downcast_rs::{impl_downcast, Downcast, DowncastSync}; pub use kaspa_consensus_core::constants::SOMPI_PER_KASPA; pub use kaspa_consensus_core::network::{NetworkId, NetworkType}; +pub use kaspa_rpc_core::api::rpc::RpcApi; pub use kaspa_utils::hex::{FromHex, ToHex}; pub use kaspa_utils::{hashmap::GroupExtension, networking::ContextualNetAddress}; pub use kaspa_wallet_core::api; @@ -17,10 +18,11 @@ pub use kaspa_wallet_core::storage::{ pub use kaspa_wallet_core::utils::*; pub use kaspa_wallet_core::Address; pub use kaspa_wrpc_client::{KaspaRpcClient, WrpcEncoding}; + // pub use egui::Ui; // pub use futures_util::future::BoxFuture; pub use async_trait::async_trait; -pub use futures::{future::FutureExt, select, Future}; +pub use futures::{pin_mut, select, FutureExt, StreamExt}; pub use futures_util::future::{join_all, try_join_all}; pub use separator::*; pub use serde::{Deserialize, Serialize}; @@ -28,7 +30,9 @@ pub use std::any::{Any, TypeId}; pub use std::cell::{Ref, RefCell, RefMut}; pub use std::collections::HashMap; pub use std::collections::VecDeque; +pub use std::future::Future; pub use std::path::{Path, PathBuf}; +pub use std::pin::Pin; pub use std::rc::Rc; pub use std::str::FromStr; pub use std::sync::{ @@ -37,8 +41,10 @@ pub use std::sync::{ }; pub use std::sync::{Arc, Mutex, MutexGuard}; pub use std::time::Duration; + pub use workflow_core::channel::{oneshot, Channel, Receiver, Sender}; pub use workflow_core::extensions::is_not_empty::*; +pub use workflow_core::task::interval; pub use workflow_core::time::unixtime_as_millis_f64; pub use workflow_i18n::*; pub use workflow_log::*; diff --git a/core/src/interop/mod.rs b/core/src/interop/mod.rs index 3ad7b4a..db21b1b 100644 --- a/core/src/interop/mod.rs +++ b/core/src/interop/mod.rs @@ -13,7 +13,7 @@ pub mod runtime; pub mod services; use runtime::Runtime; -use services::KaspaService; +use services::*; pub mod payload; pub use payload::Payload; @@ -21,6 +21,8 @@ pub use payload::Payload; pub struct Inner { application_events: ApplicationEventsChannel, kaspa: Arc, + peer_monitor: Arc, + metrics_service: Arc, runtime: Runtime, egui_ctx: egui::Context, is_running: Arc, @@ -39,12 +41,19 @@ impl Interop { pub fn new(egui_ctx: &egui::Context, settings: &Settings) -> Self { let application_events = ApplicationEventsChannel::unbounded(Some(egui_ctx.clone())); let kaspa = Arc::new(KaspaService::new(application_events.clone(), settings)); - let runtime = Runtime::new(&[kaspa.clone()]); + let peer_monitor = Arc::new(PeerMonitorService::new( + application_events.clone(), + settings, + )); + let metrics_service = Arc::new(MetricsService::new(application_events.clone(), settings)); + let runtime = Runtime::new(&[kaspa.clone(), peer_monitor.clone(), metrics_service.clone()]); let interop = Self { inner: Arc::new(Inner { application_events, kaspa, + peer_monitor, + metrics_service, runtime, egui_ctx: egui_ctx.clone(), is_running: Arc::new(AtomicBool::new(false)), @@ -66,6 +75,10 @@ impl Interop { &self.inner.runtime } + pub fn services(&self) -> Vec> { + self.inner.runtime.services() + } + /// Start the interop runtime. pub fn start(&self) { self.inner.is_running.store(true, Ordering::SeqCst); @@ -92,6 +105,14 @@ impl Interop { &self.inner.kaspa } + pub fn peer_monitor_service(&self) -> &Arc { + &self.inner.peer_monitor + } + + pub fn metrics_service(&self) -> &Arc { + &self.inner.metrics_service + } + /// Returns the reference to the application events channel. pub fn application_events(&self) -> &ApplicationEventsChannel { &self.inner.application_events diff --git a/core/src/interop/runtime.rs b/core/src/interop/runtime.rs index 17bba33..93dd943 100644 --- a/core/src/interop/runtime.rs +++ b/core/src/interop/runtime.rs @@ -5,6 +5,13 @@ pub trait Service: Sync + Send { async fn spawn(self: Arc) -> Result<()>; async fn join(self: Arc) -> Result<()>; fn terminate(self: Arc); + // -- + async fn attach_rpc(self: Arc, _rpc_api: Arc) -> Result<()> { + Ok(()) + } + async fn detach_rpc(self: Arc) -> Result<()> { + Ok(()) + } } pub struct Inner { diff --git a/core/src/interop/services/kaspa/logs.rs b/core/src/interop/services/kaspa/logs.rs index 9e9badf..b37bc54 100644 --- a/core/src/interop/services/kaspa/logs.rs +++ b/core/src/interop/services/kaspa/logs.rs @@ -1,4 +1,4 @@ -use egui::RichText; +use crate::imports::*; pub enum Log { Debug(String), @@ -49,6 +49,6 @@ impl From<&Log> for RichText { Log::Processed(text) => RichText::from(text).color(egui::Color32::LIGHT_GREEN), }; - text.monospace() + text.font(FontId::monospace(theme().node_log_font_size)) } } diff --git a/core/src/interop/services/kaspa/mod.rs b/core/src/interop/services/kaspa/mod.rs index e382326..31a43c2 100644 --- a/core/src/interop/services/kaspa/mod.rs +++ b/core/src/interop/services/kaspa/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use crate::imports::*; use crate::interop::runtime::Service; pub use futures::{future::FutureExt, select, Future}; -use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; +// use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; #[allow(unused_imports)] use kaspa_wallet_core::rpc::{NotificationMode, Rpc, RpcCtl, WrpcEncoding}; use kaspa_wallet_core::{ConnectOptions, ConnectStrategy}; @@ -18,9 +18,6 @@ cfg_if! { } } -#[allow(clippy::identity_op)] -pub const MAX_METRICS_SAMPLES: usize = 60 * 60 * 24 * 1; // 1 day - cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { use std::path::PathBuf; @@ -80,8 +77,8 @@ pub struct KaspaService { pub task_ctl: Channel<()>, pub network: Mutex, pub wallet: Arc, - pub metrics: Arc, - pub metrics_data: Mutex>>, + // pub metrics: Arc, + // pub metrics_data: Mutex>>, #[cfg(not(target_arch = "wasm32"))] pub kaspad: Mutex>>, #[cfg(not(target_arch = "wasm32"))] @@ -121,11 +118,11 @@ impl KaspaService { } } - let metrics = Arc::new(Metrics::default()); - let metrics_data = Metric::list() - .into_iter() - .map(|metric| (metric, Vec::new())) - .collect::>>(); + // let metrics = Arc::new(Metrics::default()); + // let metrics_data = Metric::list() + // .into_iter() + // .map(|metric| (metric, Vec::new())) + // .collect::>>(); Self { application_events, @@ -133,8 +130,8 @@ impl KaspaService { task_ctl: Channel::oneshot(), network: Mutex::new(settings.node.network), wallet: Arc::new(wallet), - metrics, - metrics_data: Mutex::new(metrics_data), + // metrics, + // metrics_data: Mutex::new(metrics_data), #[cfg(not(target_arch = "wasm32"))] kaspad: Mutex::new(None), #[cfg(not(target_arch = "wasm32"))] @@ -241,6 +238,10 @@ impl KaspaService { return Ok(()); } + for service in crate::interop::interop().services().into_iter() { + service.detach_rpc().await?; + } + if let Ok(wrpc_client) = self .wallet() .rpc_api() @@ -267,9 +268,9 @@ impl KaspaService { } } - self.metrics.unregister_sink(); - self.metrics.stop_task().await?; - self.metrics.set_rpc(None); + // self.metrics.unregister_sink(); + // self.metrics.stop_task().await?; + // self.metrics.set_rpc(None); Ok(()) } @@ -289,17 +290,21 @@ impl KaspaService { .await .expect("Unable to start wallet service"); - let this = self.clone(); - self.metrics - .register_sink(Arc::new(Box::new(move |snapshot: MetricsSnapshot| { - if let Err(err) = this.ingest_metrics_snapshot(Box::new(snapshot)) { - println!("Error ingesting metrics snapshot: {}", err); - } - None - }))); - self.reset_metrics_data()?; - self.metrics.start_task().await?; - self.metrics.set_rpc(Some(rpc_api)); + // let this = self.clone(); + // self.metrics + // .register_sink(Arc::new(Box::new(move |snapshot: MetricsSnapshot| { + // if let Err(err) = this.ingest_metrics_snapshot(Box::new(snapshot)) { + // println!("Error ingesting metrics snapshot: {}", err); + // } + // None + // }))); + // self.reset_metrics_data()?; + // self.metrics.start_task().await?; + // self.metrics.set_rpc(Some(rpc_api.clone())); + + for service in crate::interop::interop().services().into_iter() { + service.attach_rpc(rpc_api.clone()).await?; + } // if rpc client is KaspaRpcClient, auto-connect to the node // if let Some(wrpc_client) = wrpc_client { @@ -332,67 +337,67 @@ impl KaspaService { } } - pub fn metrics_data(&self) -> MutexGuard<'_, HashMap>> { - self.metrics_data.lock().unwrap() - } + // pub fn metrics_data(&self) -> MutexGuard<'_, HashMap>> { + // self.metrics_data.lock().unwrap() + // } - pub fn metrics(&self) -> &Arc { - &self.metrics - } + // pub fn metrics(&self) -> &Arc { + // &self.metrics + // } // pub fn connected_peer_info(&self) -> Option>> { // self.metrics.connected_peer_info() // } - pub fn reset_metrics_data(&self) -> Result<()> { - let now = unixtime_as_millis_f64(); - let mut template = Vec::with_capacity(MAX_METRICS_SAMPLES); - let mut plot_point = PlotPoint { - x: now - MAX_METRICS_SAMPLES as f64 * 1000.0, - y: 0.0, - }; - while template.len() < MAX_METRICS_SAMPLES { - template.push(plot_point); - plot_point.x += 1000.0; - } - - let mut metrics_data = self.metrics_data.lock().unwrap(); - for metric in Metric::list().into_iter() { - metrics_data.insert(metric, template.clone()); - } - Ok(()) - } - - pub fn ingest_metrics_snapshot(&self, snapshot: Box) -> Result<()> { - let timestamp = snapshot.unixtime; - let mut metrics_data = self.metrics_data.lock().unwrap(); - for metric in Metric::list().into_iter() { - let dest = metrics_data.get_mut(&metric).unwrap(); - if dest.len() > MAX_METRICS_SAMPLES { - dest.drain(0..dest.len() - MAX_METRICS_SAMPLES); - } - // else if dest.len() < MAX_METRICS_SAMPLES { - // let mut last_point = dest.last().cloned().unwrap_or_default(); - // while dest.len() < MAX_METRICS_SAMPLES { - // last_point.x += 1000.0; - // dest.push(last_point.clone()); - // } - // } - dest.push(PlotPoint { - x: timestamp, - y: snapshot.get(&metric), - }); - } - - // if update_metrics_flag().load(Ordering::SeqCst) { - self.application_events - .sender - .try_send(crate::events::Events::Metrics { snapshot }) - .unwrap(); - // } + // pub fn reset_metrics_data(&self) -> Result<()> { + // let now = unixtime_as_millis_f64(); + // let mut template = Vec::with_capacity(MAX_METRICS_SAMPLES); + // let mut plot_point = PlotPoint { + // x: now - MAX_METRICS_SAMPLES as f64 * 1000.0, + // y: 0.0, + // }; + // while template.len() < MAX_METRICS_SAMPLES { + // template.push(plot_point); + // plot_point.x += 1000.0; + // } + + // let mut metrics_data = self.metrics_data.lock().unwrap(); + // for metric in Metric::list().into_iter() { + // metrics_data.insert(metric, template.clone()); + // } + // Ok(()) + // } - Ok(()) - } + // pub fn ingest_metrics_snapshot(&self, snapshot: Box) -> Result<()> { + // let timestamp = snapshot.unixtime; + // let mut metrics_data = self.metrics_data.lock().unwrap(); + // for metric in Metric::list().into_iter() { + // let dest = metrics_data.get_mut(&metric).unwrap(); + // if dest.len() > MAX_METRICS_SAMPLES { + // dest.drain(0..dest.len() - MAX_METRICS_SAMPLES); + // } + // // else if dest.len() < MAX_METRICS_SAMPLES { + // // let mut last_point = dest.last().cloned().unwrap_or_default(); + // // while dest.len() < MAX_METRICS_SAMPLES { + // // last_point.x += 1000.0; + // // dest.push(last_point.clone()); + // // } + // // } + // dest.push(PlotPoint { + // x: timestamp, + // y: snapshot.get(&metric), + // }); + // } + + // // if update_metrics_flag().load(Ordering::SeqCst) { + // self.application_events + // .sender + // .try_send(crate::events::Events::Metrics { snapshot }) + // .unwrap(); + // // } + + // Ok(()) + // } } #[async_trait] diff --git a/core/src/interop/services/metrics.rs b/core/src/interop/services/metrics.rs index a78b660..ae5333e 100644 --- a/core/src/interop/services/metrics.rs +++ b/core/src/interop/services/metrics.rs @@ -1,17 +1,16 @@ -use std::time::Duration; +// use std::time::Duration; use crate::imports::*; -use crate::runtime::Service; +use crate::interop::runtime::Service; pub use futures::{future::FutureExt, select, Future}; use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; #[allow(unused_imports)] use kaspa_wallet_core::rpc::{NotificationMode, Rpc, RpcCtl, WrpcEncoding}; -use kaspa_wallet_core::{ConnectOptions, ConnectStrategy}; +// use kaspa_wallet_core::{ConnectOptions, ConnectStrategy}; #[allow(clippy::identity_op)] pub const MAX_METRICS_SAMPLES: usize = 60 * 60 * 24 * 1; // 1 day - pub struct MetricsService { pub application_events: ApplicationEventsChannel, // pub service_events: Channel, @@ -23,11 +22,10 @@ pub struct MetricsService { } impl MetricsService { - pub fn new(application_events: ApplicationEventsChannel, settings: &Settings) -> Self { + pub fn new(application_events: ApplicationEventsChannel, _settings: &Settings) -> Self { // create service event channel // let service_events = Channel::unbounded(); - let metrics = Arc::new(Metrics::default()); let metrics_data = Metric::list() .into_iter() @@ -45,33 +43,16 @@ impl MetricsService { } } + // pub async fn stop_all_services(&self) -> Result<()> { - pub async fn stop_all_services(&self) -> Result<()> { + // Ok(()) + // } - self.metrics.unregister_sink(); - self.metrics.stop_task().await?; - self.metrics.set_rpc(None); + // pub async fn start_all_services(self: &Arc, rpc: Rpc, network: Network) -> Result<()> { + // let rpc_api = rpc.rpc_api().clone(); - Ok(()) - } - - pub async fn start_all_services(self: &Arc, rpc: Rpc, network: Network) -> Result<()> { - let rpc_api = rpc.rpc_api().clone(); - - let this = self.clone(); - self.metrics - .register_sink(Arc::new(Box::new(move |snapshot: MetricsSnapshot| { - if let Err(err) = this.ingest_metrics_snapshot(Box::new(snapshot)) { - println!("Error ingesting metrics snapshot: {}", err); - } - None - }))); - self.reset_metrics_data()?; - self.metrics.start_task().await?; - self.metrics.set_rpc(Some(rpc_api)); - - Ok(()) - } + // Ok(()) + // } pub fn metrics_data(&self) -> MutexGuard<'_, HashMap>> { self.metrics_data.lock().unwrap() @@ -134,49 +115,71 @@ impl MetricsService { #[async_trait] impl Service for MetricsService { - async fn spawn(self: Arc) -> Result<()> { + async fn attach_rpc(self: Arc, rpc_api: Arc) -> Result<()> { let this = self.clone(); - let wallet_events = this.wallet.multiplexer().channel(); - let _application_events_sender = self.application_events.sender.clone(); + self.metrics + .register_sink(Arc::new(Box::new(move |snapshot: MetricsSnapshot| { + if let Err(err) = this.ingest_metrics_snapshot(Box::new(snapshot)) { + println!("Error ingesting metrics snapshot: {}", err); + } + None + }))); - loop { - // println!("loop..."); - select! { + self.reset_metrics_data()?; + self.metrics.start_task().await?; + self.metrics.set_rpc(Some(rpc_api)); + Ok(()) + } + async fn detach_rpc(self: Arc) -> Result<()> { + self.metrics.unregister_sink(); + self.metrics.stop_task().await?; + self.metrics.set_rpc(None); + Ok(()) + } - msg = this.as_ref().service_events.receiver.recv().fuse() => { + async fn spawn(self: Arc) -> Result<()> { + // let this = self.clone(); + // // let wallet_events = this.wallet.multiplexer().channel(); + // let _application_events_sender = self.application_events.sender.clone(); - if let Ok(event) = msg { + // loop { + // // println!("loop..."); + // select! { - match event { + // msg = this.as_ref().service_events.receiver.recv().fuse() => { - KaspadServiceEvents::Exit => { - break; - } - } - } else { - break; - } - } - } - } + // if let Ok(event) = msg { + + // match event { + + // KaspadServiceEvents::Exit => { + // break; + // } + // } + // } else { + // break; + // } + // } + // } + // } - println!("shutting down node manager..."); - this.stop_all_services().await?; - this.task_ctl.send(()).await.unwrap(); + // println!("shutting down node manager..."); + // this.stop_all_services().await?; + // this.task_ctl.send(()).await.unwrap(); Ok(()) } fn terminate(self: Arc) { - self.service_events - .sender - .try_send(KaspadServiceEvents::Exit) - .unwrap(); + // self.service_events + // .sender + // .try_send(KaspadServiceEvents::Exit) + // .unwrap(); } async fn join(self: Arc) -> Result<()> { - self.task_ctl.recv().await.unwrap(); + // self.task_ctl.recv().await.unwrap(); Ok(()) } } diff --git a/core/src/interop/services/mod.rs b/core/src/interop/services/mod.rs index 6c4e5a3..2ae6834 100644 --- a/core/src/interop/services/mod.rs +++ b/core/src/interop/services/mod.rs @@ -1,2 +1,8 @@ pub mod kaspa; pub use kaspa::KaspaService; + +pub mod peers; +pub use peers::PeerMonitorService; + +pub mod metrics; +pub use metrics::MetricsService; diff --git a/core/src/interop/services/peers.rs b/core/src/interop/services/peers.rs new file mode 100644 index 0000000..0dc2cb4 --- /dev/null +++ b/core/src/interop/services/peers.rs @@ -0,0 +1,115 @@ +// use std::time::Duration; + +use crate::imports::*; +use crate::interop::runtime::Service; +pub use futures::{future::FutureExt, select, Future}; +use kaspa_rpc_core::RpcPeerInfo; +// use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; +#[allow(unused_imports)] +use kaspa_wallet_core::rpc::{NotificationMode, Rpc, RpcCtl, WrpcEncoding}; +// use kaspa_wallet_core::{ConnectOptions, ConnectStrategy}; + +// #[allow(clippy::identity_op)] +pub const PEER_POLLING_INTERVAL: usize = 1; // 1 sec + +pub enum PeerMonitorEvents { + Exit, +} + +pub struct PeerMonitorService { + pub application_events: ApplicationEventsChannel, + pub service_events: Channel, + pub task_ctl: Channel<()>, + pub rpc_api: Mutex>>, + pub peer_info: Mutex>>>, +} + +impl PeerMonitorService { + pub fn new(application_events: ApplicationEventsChannel, _settings: &Settings) -> Self { + Self { + application_events, + service_events: Channel::unbounded(), + task_ctl: Channel::oneshot(), + rpc_api: Mutex::new(None), + peer_info: Mutex::new(None), + } + } + + pub fn rpc_api(&self) -> Option> { + self.rpc_api.lock().unwrap().clone() + } + + pub fn peer_info(&self) -> Option>> { + self.peer_info.lock().unwrap().clone() + } +} + +#[async_trait] +impl Service for PeerMonitorService { + async fn attach_rpc(self: Arc, rpc_api: Arc) -> Result<()> { + self.rpc_api.lock().unwrap().replace(rpc_api); + Ok(()) + } + + async fn detach_rpc(self: Arc) -> Result<()> { + self.rpc_api.lock().unwrap().take(); + self.peer_info.lock().unwrap().take(); + + Ok(()) + } + + async fn spawn(self: Arc) -> Result<()> { + let this = self.clone(); + // let wallet_events = this.wallet.multiplexer().channel(); + let _application_events_sender = self.application_events.sender.clone(); + + let interval = interval(Duration::from_secs(1)); + pin_mut!(interval); + + loop { + // println!("loop..."); + select! { + _ = interval.next().fuse() => { + if let Some(rpc_api) = this.rpc_api() { + if let Ok(resp) = rpc_api.get_connected_peer_info().await { + this.peer_info.lock().unwrap().replace(Arc::new(resp.peer_info)); + } + } + }, + + msg = this.as_ref().service_events.receiver.recv().fuse() => { + + if let Ok(event) = msg { + + match event { + + PeerMonitorEvents::Exit => { + break; + } + } + } else { + break; + } + } + } + } + + println!("shutting down peer monitor..."); + // this.stop_all_services().await?; + this.task_ctl.send(()).await.unwrap(); + + Ok(()) + } + + fn terminate(self: Arc) { + self.service_events + .sender + .try_send(PeerMonitorEvents::Exit) + .unwrap(); + } + + async fn join(self: Arc) -> Result<()> { + self.task_ctl.recv().await.unwrap(); + Ok(()) + } +} diff --git a/core/src/modules/logs.rs b/core/src/modules/logs.rs index 6242601..68efec1 100644 --- a/core/src/modules/logs.rs +++ b/core/src/modules/logs.rs @@ -15,9 +15,9 @@ impl Logs { impl ModuleT for Logs { - fn style(&self) -> ModuleStyle { - ModuleStyle::Default - } + // fn style(&self) -> ModuleStyle { + // ModuleStyle::Default + // } fn render( &mut self, diff --git a/core/src/modules/metrics.rs b/core/src/modules/metrics.rs index fbae823..4937293 100644 --- a/core/src/modules/metrics.rs +++ b/core/src/modules/metrics.rs @@ -1,5 +1,5 @@ use crate::imports::*; -use crate::interop::services::kaspa::MAX_METRICS_SAMPLES; +use crate::interop::services::metrics::MAX_METRICS_SAMPLES; use egui_extras::{StripBuilder, Size}; use kaspa_metrics::{Metric,MetricGroup, MetricsSnapshot}; use chrono::DateTime; @@ -135,8 +135,8 @@ impl Metrics { let graph_color = match group { MetricGroup::System => theme.performance_graph_color, MetricGroup::Storage => theme.storage_graph_color, - MetricGroup::Node => theme.node_graph_color, MetricGroup::Network => theme.network_graph_color, + MetricGroup::BlockDAG => theme.blockdag_graph_color, }; StripBuilder::new(ui) @@ -154,7 +154,7 @@ impl Metrics { // --- let graph_data = { - let metrics_data = self.interop.kaspa_service().metrics_data(); + let metrics_data = self.interop.metrics_service().metrics_data(); let data = metrics_data.get(&metric).unwrap(); let samples = if data.len() < duration { data.len() } else { duration }; data[data.len()-samples..].to_vec() diff --git a/core/src/modules/node.rs b/core/src/modules/node.rs index 5ba5b07..1a58e67 100644 --- a/core/src/modules/node.rs +++ b/core/src/modules/node.rs @@ -48,7 +48,7 @@ impl ModuleT for Node { ui.vertical(|ui| { - if let Some(peers) = self.interop.kaspa_service().metrics().connected_peer_info() { + if let Some(peers) = self.interop.peer_monitor_service().peer_info() { let (inbound, outbound) : (Vec<_>,Vec<_>) = peers.iter().partition(|peer| peer.is_outbound); CollapsingHeader::new(i18n("Inbound")) diff --git a/core/src/modules/overview.rs b/core/src/modules/overview.rs index f894a2d..fdafdfc 100644 --- a/core/src/modules/overview.rs +++ b/core/src/modules/overview.rs @@ -47,7 +47,7 @@ impl ModuleT for Overview { CollapsingHeader::new(i18n("Kaspa")) .default_open(true) .show(ui, |ui| { - ui.label(format!("Kaspa NG v{} + Rusty Kaspa v{}", env!("CARGO_PKG_VERSION"), kaspa_wallet_core::version())); + ui.label(format!("Kaspa NG v{}-{} + Rusty Kaspa v{}", env!("CARGO_PKG_VERSION"), kaspa_wallet_core::version(), crate::app::GIT_DESCRIBE)); self.render_graphs(core,ui); }); @@ -114,12 +114,12 @@ impl Overview { let graph_color = match group { MetricGroup::System => theme.performance_graph_color, MetricGroup::Storage => theme.storage_graph_color, - MetricGroup::Node => theme.node_graph_color, MetricGroup::Network => theme.network_graph_color, + MetricGroup::BlockDAG => theme.blockdag_graph_color, }; let graph_data = { - let metrics_data = self.interop.kaspa_service().metrics_data(); + let metrics_data = self.interop.metrics_service().metrics_data(); let data = metrics_data.get(&metric).unwrap(); let mut duration = 2 * 60; let uptime = self.interop.uptime().as_secs() as usize; diff --git a/resources/i18n/i18n.json b/resources/i18n/i18n.json index 12f5bf4..b453f29 100644 --- a/resources/i18n/i18n.json +++ b/resources/i18n/i18n.json @@ -16,13 +16,13 @@ "nl": "Dutch", "vi": "Vietnamese", "fa": "Farsi", + "pa": "Panjabi", "fil": "Filipino", "lt": "Lithuanian", "sv": "Swedish", "es": "EspaƱol", "fi": "Finnish", "uk": "Ukrainian", - "pa": "Panjabi", "af": "Afrikaans", "et": "Esti", "en": "English", @@ -69,13 +69,14 @@ "lt": {}, "sv": {}, "es": {}, - "fi": {}, "uk": {}, + "fi": {}, "af": {}, "et": {}, "en": { "Kaspa": "Kaspa", "Mainnet (Main Kaspa network)": "Mainnet (Main Kaspa network)", + "Borsh Connection Attempts": "Borsh Connection Attempts", "TPS": "TPS", "Ping": "Ping", "Dependencies": "Dependencies", @@ -83,12 +84,15 @@ "Disable": "Disable", "Chain Blocks": "Chain Blocks", "Mass Counts": "Mass Counts", + "Active Peers": "Active Peers", "File Handles": "File Handles", "Connection duration": "Connection duration", "Testnet-11 (10 BPS)": "Testnet-11 (10 BPS)", "Virtual Memory": "Virtual Memory", "Resident Memory": "Resident Memory", "Inbound": "Inbound", + "Json Active Connections": "Json Active Connections", + "Borsh Handshake Failures": "Borsh Handshake Failures", "Blocks Submitted": "Blocks Submitted", "Tip Hashes": "Tip Hashes", "IBD": "IBD", @@ -102,13 +106,16 @@ "User Agent": "User Agent", "Node Status": "Node Status", "Body Counts": "Body Counts", + "Json Connection Attempts": "Json Connection Attempts", "Time Offset": "Time Offset", + "Json Handshake Failures": "Json Handshake Failures", "Difficulty": "Difficulty", "Storage Read": "Storage Read", "Protocol": "Protocol", "Not connected": "Not connected", "No peers": "No peers", "Network Peers": "Network Peers", + "Borsh Active Connections": "Borsh Active Connections", "Peers Connected": "Peers Connected", "Virtual DAA Score": "Virtual DAA Score", "Outbound": "Outbound",