From fb4654c92cd9090f2a9506854260e10c79ba743e Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Wed, 5 Jun 2024 18:05:46 +0530 Subject: [PATCH 1/8] feat(logging): optionally print updates to stdout and add manager crates --- sn_logging/src/layers.rs | 17 ++++++++++++++--- sn_logging/src/lib.rs | 13 +++++++++++-- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/sn_logging/src/layers.rs b/sn_logging/src/layers.rs index a97f665e4d..4200c263e8 100644 --- a/sn_logging/src/layers.rs +++ b/sn_logging/src/layers.rs @@ -105,10 +105,13 @@ impl TracingLayers { format: LogFormat, max_uncompressed_log_files: Option, max_compressed_log_files: Option, + print_updates_to_stdout: bool, ) -> Result { let layer = match output_dest { LogOutputDest::Stdout => { - println!("Logging to stdout"); + if print_updates_to_stdout { + println!("Logging to stdout"); + } tracing_fmt::layer() .with_ansi(false) .with_target(false) @@ -117,7 +120,9 @@ impl TracingLayers { } LogOutputDest::Path(path) => { std::fs::create_dir_all(path)?; - println!("Logging to directory: {path:?}"); + if print_updates_to_stdout { + println!("Logging to directory: {path:?}"); + } // the number of normal files let max_uncompressed_log_files = @@ -153,7 +158,9 @@ impl TracingLayers { }; let targets = match std::env::var("SN_LOG") { Ok(sn_log_val) => { - println!("Using SN_LOG={sn_log_val}"); + if print_updates_to_stdout { + println!("Using SN_LOG={sn_log_val}"); + } get_logging_targets(&sn_log_val)? } Err(_) => default_logging_targets, @@ -265,6 +272,8 @@ fn get_logging_targets(logging_env_value: &str) -> Result> ("safenode".to_string(), Level::TRACE), ("safenode_rpc_client".to_string(), Level::TRACE), ("safe".to_string(), Level::TRACE), + ("safenode_manager".to_string(), Level::TRACE), + ("safenodemand".to_string(), Level::TRACE), // libs ("sn_build_info".to_string(), Level::TRACE), ("sn_cli".to_string(), Level::TRACE), @@ -272,10 +281,12 @@ fn get_logging_targets(logging_env_value: &str) -> Result> ("sn_faucet".to_string(), Level::TRACE), ("sn_logging".to_string(), Level::TRACE), ("sn_node".to_string(), Level::TRACE), + ("sn_node_manager".to_string(), Level::TRACE), ("sn_node_rpc_client".to_string(), Level::TRACE), ("sn_peers_acquisition".to_string(), Level::TRACE), ("sn_protocol".to_string(), Level::TRACE), ("sn_registers".to_string(), Level::INFO), + ("sn_service_management".to_string(), Level::TRACE), ("sn_transfers".to_string(), Level::TRACE), ]); } diff --git a/sn_logging/src/lib.rs b/sn_logging/src/lib.rs index 4e285b53a9..2240d61857 100644 --- a/sn_logging/src/lib.rs +++ b/sn_logging/src/lib.rs @@ -124,6 +124,8 @@ pub struct LogBuilder { format: LogFormat, max_uncompressed_log_files: Option, max_compressed_log_files: Option, + /// Setting this would print the sn_logging related updates to stdout. + print_updates_to_stdout: bool, } impl LogBuilder { @@ -138,6 +140,7 @@ impl LogBuilder { format: LogFormat::Default, max_uncompressed_log_files: None, max_compressed_log_files: None, + print_updates_to_stdout: true, } } @@ -161,6 +164,11 @@ impl LogBuilder { self.max_compressed_log_files = Some(files); } + /// Setting this to false would prevent sn_logging from printing things to stdout. + pub fn print_updates_to_stdout(&mut self, print: bool) { + self.print_updates_to_stdout = print; + } + /// Inits node logging, returning the NonBlocking guard if present. /// This guard should be held for the life of the program. /// @@ -174,6 +182,7 @@ impl LogBuilder { self.format, self.max_uncompressed_log_files, self.max_compressed_log_files, + self.print_updates_to_stdout, )?; #[cfg(feature = "otlp")] @@ -192,7 +201,7 @@ impl LogBuilder { .try_init() .is_err() { - println!("Tried to initialize and set global default subscriber more than once"); + eprintln!("Tried to initialize and set global default subscriber more than once"); } Ok((reload_handle, layers.log_appender_guard)) @@ -257,7 +266,7 @@ impl LogBuilder { let mut layers = TracingLayers::default(); let _reload_handle = layers - .fmt_layer(vec![], &output_dest, LogFormat::Default, None, None) + .fmt_layer(vec![], &output_dest, LogFormat::Default, None, None, true) .expect("Failed to get TracingLayers"); layers } From 8721fc8891ae775dae280faa481965e5a6d6140c Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Wed, 5 Jun 2024 19:07:01 +0530 Subject: [PATCH 2/8] chore(launchpad): update the default logging targets --- node-launchpad/src/utils.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node-launchpad/src/utils.rs b/node-launchpad/src/utils.rs index 7de61f7072..3357fb41b7 100644 --- a/node-launchpad/src/utils.rs +++ b/node-launchpad/src/utils.rs @@ -75,6 +75,7 @@ pub fn initialize_panic_handler() -> Result<()> { Ok(()) } +// todo: use sn_logging pub fn initialize_logging() -> Result<()> { let timestamp = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string(); let log_path = get_launchpad_data_dir_path()?.join("logs"); @@ -84,7 +85,7 @@ pub fn initialize_logging() -> Result<()> { std::env::set_var( "RUST_LOG", std::env::var("RUST_LOG") - .unwrap_or_else(|_| format!("{}=trace,debug", env!("CARGO_CRATE_NAME"))), + .unwrap_or_else(|_| format!("{}=trace,sn_node_manager=trace,sn_service_management=trace,sn_peers_acquisition=trace", env!("CARGO_CRATE_NAME"))), ); let file_subscriber = tracing_subscriber::fmt::layer() .with_file(true) From 8d54d95aa023771ebb79043467ecd060566ce3e3 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Wed, 5 Jun 2024 19:07:21 +0530 Subject: [PATCH 3/8] chore(manager): add logging for node manager and service management --- Cargo.lock | 1 + sn_logging/src/lib.rs | 2 +- sn_node_manager/Cargo.toml | 1 + sn_node_manager/src/add_services/mod.rs | 59 +++++++++++++- sn_node_manager/src/bin/cli/main.rs | 29 ++++++- sn_node_manager/src/bin/daemon/main.rs | 32 +++++++- sn_node_manager/src/cmd/auditor.rs | 20 ++++- sn_node_manager/src/cmd/daemon.rs | 10 +++ sn_node_manager/src/cmd/faucet.rs | 9 +++ sn_node_manager/src/cmd/local.rs | 20 ++++- sn_node_manager/src/cmd/mod.rs | 16 +++- sn_node_manager/src/cmd/nat_detection.rs | 59 +++++++++++--- sn_node_manager/src/cmd/node.rs | 82 +++++++++++++++----- sn_node_manager/src/config.rs | 89 +++++++++++++++++----- sn_node_manager/src/helpers.rs | 56 +++++++++++--- sn_node_manager/src/lib.rs | 84 ++++++++++++++++++-- sn_node_manager/src/local.rs | 15 +++- sn_node_manager/src/rpc.rs | 24 ++++-- sn_node_manager/src/rpc_client.rs | 4 + sn_service_management/src/auditor.rs | 6 +- sn_service_management/src/control.rs | 97 ++++++++++++++++++------ sn_service_management/src/daemon.rs | 5 +- sn_service_management/src/lib.rs | 36 +++++++-- sn_service_management/src/node.rs | 11 ++- 24 files changed, 649 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4084c9dd21..4ca08f0b21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6943,6 +6943,7 @@ dependencies = [ "assert_fs", "assert_matches", "async-trait", + "chrono", "clap", "color-eyre", "colored", diff --git a/sn_logging/src/lib.rs b/sn_logging/src/lib.rs index 2240d61857..8464fecabd 100644 --- a/sn_logging/src/lib.rs +++ b/sn_logging/src/lib.rs @@ -266,7 +266,7 @@ impl LogBuilder { let mut layers = TracingLayers::default(); let _reload_handle = layers - .fmt_layer(vec![], &output_dest, LogFormat::Default, None, None, true) + .fmt_layer(vec![], &output_dest, LogFormat::Default, None, None, false) .expect("Failed to get TracingLayers"); layers } diff --git a/sn_node_manager/Cargo.toml b/sn_node_manager/Cargo.toml index 0b031b47a2..af16e03b6a 100644 --- a/sn_node_manager/Cargo.toml +++ b/sn_node_manager/Cargo.toml @@ -30,6 +30,7 @@ tcp = [] websockets = [] [dependencies] +chrono = "~0.4.19" clap = { version = "4.4.6", features = ["derive", "env"] } colored = "2.0.4" color-eyre = "~0.6" diff --git a/sn_node_manager/src/add_services/mod.rs b/sn_node_manager/src/add_services/mod.rs index 87ca6ae120..387dd74c95 100644 --- a/sn_node_manager/src/add_services/mod.rs +++ b/sn_node_manager/src/add_services/mod.rs @@ -50,12 +50,14 @@ pub async fn add_node( if options.genesis { if let Some(count) = options.count { if count > 1 { + error!("A genesis node can only be added as a single node"); return Err(eyre!("A genesis node can only be added as a single node")); } } let genesis_node = node_registry.nodes.iter().find(|n| n.genesis); if genesis_node.is_some() { + error!("A genesis node already exists"); return Err(eyre!("A genesis node already exists")); } } @@ -65,6 +67,7 @@ pub async fn add_node( PortRange::Single(_) => { let count = options.count.unwrap_or(1); if count != 1 { + error!("The number of services to add ({count}) does not match the number of ports (1)"); return Err(eyre!( "The number of services to add ({count}) does not match the number of ports (1)" )); @@ -74,6 +77,7 @@ pub async fn add_node( let port_count = end - start + 1; let service_count = options.count.unwrap_or(1); if port_count != service_count { + error!("The number of services to add ({service_count}) does not match the number of ports ({port_count})"); return Err(eyre!( "The number of services to add ({service_count}) does not match the number of ports ({port_count})" )); @@ -97,7 +101,10 @@ pub async fn add_node( let safenode_file_name = options .safenode_src_path .file_name() - .ok_or_else(|| eyre!("Could not get filename from the safenode download path"))? + .ok_or_else(|| { + error!("Could not get filename from the safenode download path"); + eyre!("Could not get filename from the safenode download path") + })? .to_string_lossy() .to_string(); @@ -139,6 +146,7 @@ pub async fn add_node( let mut rpc_port = get_start_port_if_applicable(options.rpc_port); while node_number <= target_node_count { + trace!("Adding node with node_number {node_number}"); let rpc_free_port = if let Some(port) = rpc_port { port } else { @@ -178,13 +186,16 @@ pub async fn add_node( }; if let Some(user) = &options.user { + debug!("Creating data_dir and log_dirs with user {user}"); create_owned_dir(service_data_dir_path.clone(), user)?; create_owned_dir(service_log_dir_path.clone(), user)?; } else { + debug!("Creating data_dir and log_dirs without user"); std::fs::create_dir_all(service_data_dir_path.clone())?; std::fs::create_dir_all(service_log_dir_path.clone())?; } + debug!("Copying safenode binary to {service_safenode_path:?}"); std::fs::copy( options.safenode_src_path.clone(), service_safenode_path.clone(), @@ -210,6 +221,10 @@ pub async fn add_node( options.home_network = true; } } + debug!( + "Auto-setting NAT flags: upnp={}, home_network={}", + options.upnp, options.home_network + ); } let install_ctx = InstallNodeServiceCtxBuilder { @@ -235,6 +250,7 @@ pub async fn add_node( match service_control.install(install_ctx, options.user_mode) { Ok(()) => { + info!("Successfully added service {service_name}"); added_service_data.push(( service_name.clone(), service_safenode_path.to_string_lossy().into_owned(), @@ -274,6 +290,7 @@ pub async fn add_node( node_registry.save()?; } Err(e) => { + error!("Failed to add service {service_name}: {e}"); failed_service_data.push((service_name.clone(), e.to_string())); } } @@ -285,9 +302,16 @@ pub async fn add_node( } if options.delete_safenode_src { + debug!("Deleting safenode binary file"); std::fs::remove_file(options.safenode_src_path)?; } + if !added_service_data.is_empty() { + info!("{} services has been added", added_service_data.len()); + } else if !failed_service_data.is_empty() { + error!("Failed to add {} service(s)", failed_service_data.len()); + } + if !added_service_data.is_empty() && verbosity != VerbosityLevel::Minimal { println!("Services Added:"); for install in added_service_data.iter() { @@ -333,14 +357,23 @@ pub fn add_auditor( verbosity: VerbosityLevel, ) -> Result<()> { if node_registry.auditor.is_some() { + error!("An Auditor service has already been created"); return Err(eyre!("An Auditor service has already been created")); } + debug!( + "Creating log directory at {:?} as user {:?}", + install_options.service_log_dir_path, install_options.user + ); create_owned_dir( install_options.service_log_dir_path.clone(), &install_options.user, )?; + debug!( + "Copying auditor binary file to {:?}", + install_options.auditor_install_bin_path + ); std::fs::copy( install_options.auditor_src_bin_path.clone(), install_options.auditor_install_bin_path.clone(), @@ -368,6 +401,7 @@ pub fn add_auditor( user: install_options.user.clone(), version: install_options.version, }); + info!("Auditor service has been added successfully"); println!("Auditor service added {}", "✓".green()); if verbosity != VerbosityLevel::Minimal { println!( @@ -380,11 +414,13 @@ pub fn add_auditor( ); } println!("[!] Note: the service has not been started"); + debug!("Removing auditor binary file"); std::fs::remove_file(install_options.auditor_src_bin_path)?; node_registry.save()?; Ok(()) } Err(e) => { + error!("Failed to add auditor service: {e}"); println!("Failed to add auditor service: {e}"); Err(e.into()) } @@ -400,9 +436,14 @@ pub fn add_daemon( service_control: &dyn ServiceControl, ) -> Result<()> { if node_registry.daemon.is_some() { + error!("A safenodemand service has already been created"); return Err(eyre!("A safenodemand service has already been created")); } + debug!( + "Copying daemon binary file to {:?}", + options.daemon_install_bin_path + ); std::fs::copy( options.daemon_src_bin_path.clone(), options.daemon_install_bin_path.clone(), @@ -435,6 +476,7 @@ pub fn add_daemon( version: options.version, }; node_registry.daemon = Some(daemon); + info!("Daemon service has been added successfully"); println!("Daemon service added {}", "✓".green()); println!("[!] Note: the service has not been started"); node_registry.save()?; @@ -442,6 +484,7 @@ pub fn add_daemon( Ok(()) } Err(e) => { + error!("Failed to add daemon service: {e}"); println!("Failed to add daemon service: {e}"); Err(e.into()) } @@ -461,14 +504,22 @@ pub fn add_faucet( verbosity: VerbosityLevel, ) -> Result<()> { if node_registry.faucet.is_some() { + error!("A faucet service has already been created"); return Err(eyre!("A faucet service has already been created")); } + debug!( + "Creating log directory at {:?} as user {:?}", + install_options.service_log_dir_path, install_options.user + ); create_owned_dir( install_options.service_log_dir_path.clone(), &install_options.user, )?; - + debug!( + "Copying faucet binary file to {:?}", + install_options.faucet_install_bin_path + ); std::fs::copy( install_options.faucet_src_bin_path.clone(), install_options.faucet_install_bin_path.clone(), @@ -497,6 +548,7 @@ pub fn add_faucet( user: install_options.user.clone(), version: install_options.version, }); + info!("Faucet service has been added successfully"); println!("Faucet service added {}", "✓".green()); if verbosity != VerbosityLevel::Minimal { println!( @@ -518,6 +570,7 @@ pub fn add_faucet( Ok(()) } Err(e) => { + error!("Failed to add faucet service: {e}"); println!("Failed to add faucet service: {e}"); Err(e.into()) } @@ -557,12 +610,14 @@ fn check_port_availability(port_option: &PortRange, nodes: &[NodeServiceData]) - match port_option { PortRange::Single(port) => { if all_ports.iter().any(|p| *p == *port) { + error!("Port {port} is being used by another service"); return Err(eyre!("Port {port} is being used by another service")); } } PortRange::Range(start, end) => { for i in *start..=*end { if all_ports.iter().any(|p| *p == i) { + error!("Port {i} is being used by another service"); return Err(eyre!("Port {i} is being used by another service")); } } diff --git a/sn_node_manager/src/bin/cli/main.rs b/sn_node_manager/src/bin/cli/main.rs index e3b291b272..ba29ce510e 100644 --- a/sn_node_manager/src/bin/cli/main.rs +++ b/sn_node_manager/src/bin/cli/main.rs @@ -9,7 +9,7 @@ use clap::{Parser, Subcommand}; use color_eyre::{eyre::eyre, Result}; use libp2p::Multiaddr; -use sn_logging::LogFormat; +use sn_logging::{LogBuilder, LogFormat}; use sn_node_manager::{ add_services::config::{parse_port_range, PortRange}, cmd::{self}, @@ -17,6 +17,7 @@ use sn_node_manager::{ }; use sn_peers_acquisition::PeersArgs; use std::{net::Ipv4Addr, path::PathBuf}; +use tracing::Level; const DEFAULT_NODE_COUNT: u16 = 25; @@ -867,9 +868,12 @@ async fn main() -> Result<()> { color_eyre::install()?; let args = Cmd::parse(); let verbosity = VerbosityLevel::from(args.verbose); + let _log_handles = get_log_builder()?.initialize()?; configure_winsw(verbosity).await?; + tracing::info!("Executing cmd: {:?}", args.cmd); + match args.cmd { SubCmd::Add { auto_restart, @@ -1134,6 +1138,29 @@ async fn main() -> Result<()> { } } +fn get_log_builder() -> Result { + let logging_targets = vec![ + ("sn_node_manager".to_string(), Level::TRACE), + ("safenode_manager".to_string(), Level::TRACE), + ("safenodemand".to_string(), Level::TRACE), + ("sn_service_management".to_string(), Level::TRACE), + ]; + let timestamp = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string(); + + let output_dest = dirs_next::data_dir() + .ok_or_else(|| eyre!("Could not obtain user data directory"))? + .join("safe") + .join("safenode-manager") + .join("logs") + .join(format!("log_{timestamp}")); + + let mut log_builder = LogBuilder::new(logging_targets); + log_builder.output_dest(sn_logging::LogOutputDest::Path(output_dest)); + // disabled by default, as it can interfere with status cmd. + log_builder.print_updates_to_stdout(false); + Ok(log_builder) +} + // Since delimiter is on, we get element of the csv and not the entire csv. fn parse_environment_variables(env_var: &str) -> Result<(String, String)> { let parts: Vec<&str> = env_var.splitn(2, '=').collect(); diff --git a/sn_node_manager/src/bin/daemon/main.rs b/sn_node_manager/src/bin/daemon/main.rs index 62e1180b25..99925943be 100644 --- a/sn_node_manager/src/bin/daemon/main.rs +++ b/sn_node_manager/src/bin/daemon/main.rs @@ -12,6 +12,7 @@ extern crate tracing; use clap::Parser; use color_eyre::eyre::{eyre, Result}; use libp2p_identity::PeerId; +use sn_logging::LogBuilder; use sn_node_manager::{config::get_node_registry_path, rpc, DAEMON_DEFAULT_PORT}; use sn_service_management::{ safenode_manager_proto::{ @@ -23,6 +24,7 @@ use sn_service_management::{ }; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tonic::{transport::Server, Code, Request, Response, Status}; +use tracing::Level; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -55,8 +57,10 @@ impl SafeNodeManager for SafeNodeManagerDaemon { ) })?; - let peer_id = PeerId::from_bytes(&request.get_ref().peer_id) - .map_err(|err| Status::new(Code::Internal, format!("Failed to parse PeerId: {err}")))?; + let peer_id = PeerId::from_bytes(&request.get_ref().peer_id).map_err(|err| { + error!("Failed to parse PeerId: {err}"); + Status::new(Code::Internal, format!("Failed to parse PeerId: {err}")) + })?; Self::restart_handler(node_registry, peer_id, request.get_ref().retain_peer_id) .await @@ -64,6 +68,7 @@ impl SafeNodeManager for SafeNodeManagerDaemon { Status::new(Code::Internal, format!("Failed to restart the node: {err}")) })?; + info!("Node service restarted for {peer_id:?}"); Ok(Response::new(NodeServiceRestartResponse {})) } @@ -89,6 +94,7 @@ impl SafeNodeManager for SafeNodeManagerDaemon { number: node.number as u32, }) .collect::>(); + info!("Node status retrieved, nod len: {:?}", nodes_info.len()); Ok(Response::new(GetStatusResponse { nodes: nodes_info })) } } @@ -122,6 +128,7 @@ impl SafeNodeManagerDaemon {} #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { + let _log_handles = get_log_builder()?.initialize()?; println!("Starting safenodemand"); let args = Args::parse(); let service = SafeNodeManagerDaemon {}; @@ -139,3 +146,24 @@ async fn main() -> Result<()> { Ok(()) } + +fn get_log_builder() -> Result { + let logging_targets = vec![ + ("sn_node_manager".to_string(), Level::TRACE), + ("safenode_manager".to_string(), Level::TRACE), + ("safenodemand".to_string(), Level::TRACE), + ("sn_service_management".to_string(), Level::TRACE), + ]; + let timestamp = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string(); + + let output_dest = dirs_next::data_dir() + .ok_or_else(|| eyre!("Could not obtain user data directory"))? + .join("safe") + .join("safenodemand") + .join("logs") + .join(format!("log_{timestamp}")); + + let mut log_builder = LogBuilder::new(logging_targets); + log_builder.output_dest(sn_logging::LogOutputDest::Path(output_dest)); + Ok(log_builder) +} diff --git a/sn_node_manager/src/cmd/auditor.rs b/sn_node_manager/src/cmd/auditor.rs index 8a20fa58ce..359d8cd6d9 100644 --- a/sn_node_manager/src/cmd/auditor.rs +++ b/sn_node_manager/src/cmd/auditor.rs @@ -37,6 +37,7 @@ pub async fn add( verbosity: VerbosityLevel, ) -> Result<()> { if !is_running_as_root() { + error!("The auditor add command must run as the root user"); return Err(eyre!("The add command must run as the root user")); } @@ -72,6 +73,7 @@ pub async fn add( .await? }; + info!("Adding auditor service"); add_auditor( AddAuditorServiceOptions { auditor_src_bin_path, @@ -95,12 +97,14 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { if !is_running_as_root() { return Err(eyre!("The start command must run as the root user")); } + info!("Starting the auditor service"); let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?; if let Some(auditor) = &mut node_registry.auditor { if verbosity != VerbosityLevel::Minimal { print_banner("Start Auditor Service"); } + info!("Starting the auditor service"); let service = AuditorService::new(auditor, Box::new(ServiceController {})); let mut service_manager = ServiceManager::new( @@ -113,7 +117,7 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { node_registry.save()?; return Ok(()); } - + error!("The auditor service has not been added yet"); Err(eyre!("The auditor service has not been added yet")) } @@ -127,6 +131,7 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { if verbosity != VerbosityLevel::Minimal { print_banner("Stop Auditor Service"); } + info!("Stopping the auditor service"); let service = AuditorService::new(auditor, Box::new(ServiceController {})); let mut service_manager = @@ -138,6 +143,7 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { return Ok(()); } + error!("The auditor service has not been added yet"); Err(eyre!("The auditor service has not been added yet")) } @@ -162,15 +168,21 @@ pub async fn upgrade( if verbosity != VerbosityLevel::Minimal { print_banner("Upgrade Auditor Service"); } + info!("Upgrading the auditor service"); let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(None, ReleaseType::SnAuditor, url, version, verbosity) .await?; let auditor = node_registry.auditor.as_mut().unwrap(); + debug!( + "Current version {:?}, target version {target_version:?}", + auditor.version, + ); if !force { let current_version = Version::parse(&auditor.version)?; if target_version <= current_version { + info!("The auditor is already at the latest version, do nothing."); println!( "{} The auditor is already at the latest version", "✓".green() @@ -199,10 +211,14 @@ pub async fn upgrade( match service_manager.upgrade(options).await { Ok(upgrade_result) => { + info!("Upgrade the auditor service successfully"); print_upgrade_summary(vec![("auditor".to_string(), upgrade_result)]); node_registry.save()?; Ok(()) } - Err(e) => Err(eyre!("Upgrade failed: {e}")), + Err(e) => { + error!("Failed to upgrade the auditor service: {e:?}",); + Err(eyre!("Upgrade failed: {e}")) + } } } diff --git a/sn_node_manager/src/cmd/daemon.rs b/sn_node_manager/src/cmd/daemon.rs index 7ad04a92f6..ddfc75847e 100644 --- a/sn_node_manager/src/cmd/daemon.rs +++ b/sn_node_manager/src/cmd/daemon.rs @@ -31,6 +31,7 @@ pub async fn add( verbosity: VerbosityLevel, ) -> Result<()> { if !is_running_as_root() { + error!("The daemon add command must run as the root user"); return Err(eyre!("The add command must run as the root user")); } @@ -40,6 +41,7 @@ pub async fn add( let service_user = "safe"; let service_manager = ServiceController {}; + debug!("Trying to create service user '{service_user}' for the daemon"); service_manager.create_service_user(service_user)?; let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?; @@ -60,6 +62,8 @@ pub async fn add( .await? }; + info!("Adding daemon service"); + // At the moment we don't have the option to provide a user for running the service. Since // `safenodemand` requires manipulation of services, the user running it must either be root or // have root access. For now we will just use the `root` user. The user option gets ignored on @@ -82,6 +86,7 @@ pub async fn add( pub async fn start(verbosity: VerbosityLevel) -> Result<()> { if !is_running_as_root() { + error!("The daemon start command must run as the root user"); return Err(eyre!("The start command must run as the root user")); } @@ -90,6 +95,7 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { if verbosity != VerbosityLevel::Minimal { print_banner("Start Daemon Service"); } + info!("Starting daemon service"); let service = DaemonService::new(daemon, Box::new(ServiceController {})); let mut service_manager = @@ -109,11 +115,13 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { return Ok(()); } + error!("The daemon service has not been added yet"); Err(eyre!("The daemon service has not been added yet")) } pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { if !is_running_as_root() { + error!("The daemon stop command must run as the root user"); return Err(eyre!("The stop command must run as the root user")); } @@ -122,6 +130,7 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { if verbosity != VerbosityLevel::Minimal { print_banner("Stop Daemon Service"); } + info!("Stopping daemon service"); let service = DaemonService::new(daemon, Box::new(ServiceController {})); let mut service_manager = @@ -133,5 +142,6 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { return Ok(()); } + error!("The daemon service has not been added yet"); Err(eyre!("The daemon service has not been added yet")) } diff --git a/sn_node_manager/src/cmd/faucet.rs b/sn_node_manager/src/cmd/faucet.rs index fbe07b4efd..add34c5d2f 100644 --- a/sn_node_manager/src/cmd/faucet.rs +++ b/sn_node_manager/src/cmd/faucet.rs @@ -35,6 +35,7 @@ pub async fn add( verbosity: VerbosityLevel, ) -> Result<()> { if !is_running_as_root() { + error!("The faucet add command must run as the root user"); return Err(eyre!("The add command must run as the root user")); } @@ -70,6 +71,7 @@ pub async fn add( .await? }; + info!("Adding faucet service"); add_faucet( AddFaucetServiceOptions { bootstrap_peers: get_peers_from_args(peers).await?, @@ -92,6 +94,7 @@ pub async fn add( pub async fn start(verbosity: VerbosityLevel) -> Result<()> { if !is_running_as_root() { + error!("The faucet start command must run as the root user"); return Err(eyre!("The start command must run as the root user")); } @@ -100,6 +103,7 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { if verbosity != VerbosityLevel::Minimal { print_banner("Start Faucet Service"); } + info!("Starting faucet service"); let service = FaucetService::new(faucet, Box::new(ServiceController {})); let mut service_manager = ServiceManager::new( @@ -113,11 +117,13 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> { return Ok(()); } + error!("The faucet service has not been added yet"); Err(eyre!("The faucet service has not been added yet")) } pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { if !is_running_as_root() { + error!("The faucet stop command must run as the root user"); return Err(eyre!("The stop command must run as the root user")); } @@ -126,6 +132,7 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { if verbosity != VerbosityLevel::Minimal { print_banner("Stop Faucet Service"); } + info!("Stopping faucet service"); let service = FaucetService::new(faucet, Box::new(ServiceController {})); let mut service_manager = @@ -137,6 +144,7 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> { return Ok(()); } + error!("The faucet service has not been added yet"); Err(eyre!("The faucet service has not been added yet")) } @@ -161,6 +169,7 @@ pub async fn upgrade( if verbosity != VerbosityLevel::Minimal { print_banner("Upgrade Faucet Service"); } + info!("Upgrading faucet service"); let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(None, ReleaseType::Faucet, url, version, verbosity) diff --git a/sn_node_manager/src/cmd/local.rs b/sn_node_manager/src/cmd/local.rs index 22a9f17431..3b9cb435c8 100644 --- a/sn_node_manager/src/cmd/local.rs +++ b/sn_node_manager/src/cmd/local.rs @@ -40,6 +40,7 @@ pub async fn join( if verbosity != VerbosityLevel::Minimal { print_banner("Joining Local Network"); } + info!("Joining local network"); let local_node_reg_path = &get_local_node_registry_path()?; let mut local_node_registry = NodeRegistry::load(local_node_reg_path)?; @@ -68,9 +69,15 @@ pub async fn join( // is running. let peers = match get_peers_from_args(peers).await { Ok(peers) => Some(peers), - Err(e) => match e { - sn_peers_acquisition::error::Error::PeersNotObtained => None, - _ => return Err(e.into()), + Err(err) => match err { + sn_peers_acquisition::error::Error::PeersNotObtained => { + warn!("PeersNotObtained, peers is set to None"); + None + } + _ => { + error!("Failed to obtain peers: {err:?}"); + return Err(err.into()); + } }, }; let options = LocalNetworkOptions { @@ -93,11 +100,13 @@ pub fn kill(keep_directories: bool, verbosity: VerbosityLevel) -> Result<()> { let local_reg_path = &get_local_node_registry_path()?; let local_node_registry = NodeRegistry::load(local_reg_path)?; if local_node_registry.nodes.is_empty() { + info!("No local network is currently running, cannot kill it"); println!("No local network is currently running"); } else { if verbosity != VerbosityLevel::Minimal { print_banner("Killing Local Network"); } + info!("Kill local network"); kill_network(&local_node_registry, keep_directories)?; std::fs::remove_file(local_reg_path)?; } @@ -122,7 +131,8 @@ pub async fn run( // In the clean case, the node registry must be loaded *after* the existing network has // been killed, which clears it out. let local_node_reg_path = &get_local_node_registry_path()?; - let mut local_node_registry = if clean { + let mut local_node_registry: NodeRegistry = if clean { + debug!("Clean set to true, removing client, node dir and killing the network."); let client_data_path = dirs_next::data_dir() .ok_or_else(|| eyre!("Could not obtain user's data directory"))? .join("safe") @@ -135,6 +145,7 @@ pub async fn run( } else { let local_node_registry = NodeRegistry::load(local_node_reg_path)?; if !local_node_registry.nodes.is_empty() { + error!("A local network is already running, cannot run a new one"); return Err(eyre!("A local network is already running") .suggestion("Use the kill command to destroy the network then try again")); } @@ -144,6 +155,7 @@ pub async fn run( if verbosity != VerbosityLevel::Minimal { print_banner("Launching Local Network"); } + info!("Launching local network"); let release_repo = ::default_config(); let faucet_path = get_bin_path( diff --git a/sn_node_manager/src/cmd/mod.rs b/sn_node_manager/src/cmd/mod.rs index 33af0b83b0..b612536e7e 100644 --- a/sn_node_manager/src/cmd/mod.rs +++ b/sn_node_manager/src/cmd/mod.rs @@ -45,6 +45,10 @@ pub async fn download_and_get_upgrade_bin_path( verbosity: VerbosityLevel, ) -> Result<(PathBuf, Version)> { if let Some(path) = custom_bin_path { + debug!( + "Using the supplied custom binary at {}", + path.to_string_lossy() + ); println!( "Using the supplied custom binary at {}", path.to_string_lossy() @@ -55,6 +59,7 @@ pub async fn download_and_get_upgrade_bin_path( let release_repo = ::default_config(); if let Some(version) = version { + debug!("Downloading provided version {version} of {release_type}"); let (upgrade_bin_path, version) = download_and_extract_release( release_type, None, @@ -66,6 +71,7 @@ pub async fn download_and_get_upgrade_bin_path( .await?; Ok((upgrade_bin_path, Version::parse(&version)?)) } else if let Some(url) = url { + debug!("Downloading {release_type} from url: {url}"); let (upgrade_bin_path, version) = download_and_extract_release( release_type, Some(url), @@ -77,9 +83,12 @@ pub async fn download_and_get_upgrade_bin_path( .await?; Ok((upgrade_bin_path, Version::parse(&version)?)) } else { - println!("Retrieving latest version of {}...", release_type); + println!("Retrieving latest version of {release_type}..."); + debug!("Retrieving latest version of {release_type}..."); let latest_version = release_repo.get_latest_version(&release_type).await?; println!("Latest version is {latest_version}"); + debug!("Download latest version {latest_version} of {release_type}"); + let (upgrade_bin_path, _) = download_and_extract_release( release_type, None, @@ -137,13 +146,16 @@ pub async fn get_bin_path( verbosity: VerbosityLevel, ) -> Result { if build { + debug!("Obtaining bin path for {release_type:?} by building"); build_binary(&release_type)?; Ok(PathBuf::from("target") .join("release") .join(release_type.to_string())) } else if let Some(path) = path { + debug!("Using the supplied custom binary for {release_type:?}: {path:?}"); Ok(path) } else { + debug!("Downloading {release_type:?} binary with version {version:?}"); let (download_path, _) = download_and_extract_release( release_type, None, @@ -158,6 +170,7 @@ pub async fn get_bin_path( } fn build_binary(bin_type: &ReleaseType) -> Result<()> { + debug!("Building {bin_type} binary"); let mut args = vec!["build", "--release"]; let bin_name = bin_type.to_string(); args.push("--bin"); @@ -203,6 +216,7 @@ fn build_binary(bin_type: &ReleaseType) -> Result<()> { .output()?; if !build_result.status.success() { + error!("Failed to build binaries {bin_name}"); return Err(eyre!("Failed to build binaries")); } diff --git a/sn_node_manager/src/cmd/nat_detection.rs b/sn_node_manager/src/cmd/nat_detection.rs index bf71620657..044395f7ad 100644 --- a/sn_node_manager/src/cmd/nat_detection.rs +++ b/sn_node_manager/src/cmd/nat_detection.rs @@ -13,7 +13,11 @@ use color_eyre::eyre::{bail, OptionExt, Result}; use libp2p::Multiaddr; use sn_releases::{ReleaseType, SafeReleaseRepoActions}; use sn_service_management::{NatDetectionStatus, NodeRegistry}; -use std::{path::PathBuf, process::Stdio}; +use std::{ + io::{BufRead, BufReader}, + path::PathBuf, + process::{Command, Stdio}, +}; pub async fn run_nat_detection( servers: Vec, @@ -23,6 +27,7 @@ pub async fn run_nat_detection( version: Option, verbosity: VerbosityLevel, ) -> Result<()> { + info!("Running nat detection with servers: {servers:?}"); let mut node_registry = NodeRegistry::load(&get_node_registry_path()?)?; if !force_run { @@ -30,6 +35,7 @@ pub async fn run_nat_detection( if verbosity != VerbosityLevel::Minimal { println!("NAT status has already been set as: {status:?}"); } + debug!("NAT status has already been set as: {status:?}, returning."); return Ok(()); } } @@ -54,14 +60,10 @@ pub async fn run_nat_detection( if verbosity != VerbosityLevel::Minimal { println!("Running NAT detection. This can take a while.."); } + debug!("Running NAT detection with path: {nat_detection_path:?}. This can take a while.."); - let stdout = match verbosity { - VerbosityLevel::Minimal => Stdio::null(), - VerbosityLevel::Normal => Stdio::inherit(), - VerbosityLevel::Full => Stdio::inherit(), - }; - - let mut command = std::process::Command::new(nat_detection_path); + let mut command = Command::new(nat_detection_path); + command.stdout(Stdio::piped()).stderr(Stdio::null()); command.arg( servers .iter() @@ -69,11 +71,27 @@ pub async fn run_nat_detection( .collect::>() .join(","), ); - // todo: clarify the different verbosity levels. Minimal actually means none. Full/Normal are not used yet. - if verbosity == VerbosityLevel::Full { - command.arg("-vv"); + if tracing::level_enabled!(tracing::Level::TRACE) { + command.arg("-vvvv"); + } + let mut child = command.spawn()?; + + // only execute if log level is set to trace + if tracing::level_enabled!(tracing::Level::TRACE) { + // using buf reader to handle both stderr and stout is risky as it might block indefinitely. + if let Some(ref mut stdout) = child.stdout { + let reader = BufReader::new(stdout); + for line in reader.lines() { + let line = line?; + // only if log level is trace + + let clean_line = strip_ansi_escapes(&line); + trace!("{clean_line}"); + } + } } - let status = command.stdout(stdout).status()?; + + let status = child.wait()?; let status = match status.code().ok_or_eyre("Failed to get the exit code")? { 10 => NatDetectionStatus::Public, 11 => NatDetectionStatus::UPnP, @@ -90,3 +108,20 @@ pub async fn run_nat_detection( Ok(()) } + +fn strip_ansi_escapes(input: &str) -> String { + let mut output = String::new(); + let mut chars = input.chars(); + while let Some(c) = chars.next() { + if c == '\x1b' { + for next_char in chars.by_ref() { + if next_char.is_ascii_lowercase() || next_char.is_ascii_uppercase() { + break; + } + } + } else { + output.push(c); + } + } + output +} diff --git a/sn_node_manager/src/cmd/node.rs b/sn_node_manager/src/cmd/node.rs index fcd32c7b64..c90c7edaed 100644 --- a/sn_node_manager/src/cmd/node.rs +++ b/sn_node_manager/src/cmd/node.rs @@ -101,7 +101,7 @@ pub async fn add( .await? }; - tracing::debug!("Parsing peers..."); + debug!("Parsing peers from PeersArgs"); // Handle the `PeersNotObtained` error to make the `--peer` argument optional for the node // manager. @@ -115,10 +115,19 @@ pub async fn add( // return a huge peer list, and that's been problematic for service definition files. let is_first = peers.first; let bootstrap_peers = match get_peers_from_args(peers).await { - Ok(p) => p, - Err(e) => match e { - sn_peers_acquisition::error::Error::PeersNotObtained => Vec::new(), - _ => return Err(e.into()), + Ok(p) => { + info!("Obtained peers of length {}", p.len()); + p + } + Err(err) => match err { + sn_peers_acquisition::error::Error::PeersNotObtained => { + info!("No bootstrap peers obtained, setting empty vec."); + Vec::new() + } + _ => { + error!("Error obtaining peers: {err:?}"); + return Err(err.into()); + } }, }; @@ -148,12 +157,12 @@ pub async fn add( user_mode, version, }; - + info!("Adding node service(s)"); let added_services_names = add_node(options, &mut node_registry, &service_manager, verbosity).await?; node_registry.save()?; - tracing::debug!("Node registry saved"); + debug!("Node registry saved"); Ok(added_services_names) } @@ -177,16 +186,19 @@ pub async fn balance( let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?; if service_indices.is_empty() { + info!("Service indices is empty, cannot obtain the balance"); // This could be the case if all services are at `Removed` status. println!("No balances to display"); return Ok(()); } + debug!("Obtaining balances for {} services", service_indices.len()); for &index in &service_indices { let node = &mut node_registry.nodes[index]; let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); let service = NodeService::new(node, Box::new(rpc_client)); - let wallet = HotWallet::load_from(&service.service_data.data_dir_path)?; + let wallet = HotWallet::load_from(&service.service_data.data_dir_path) + .inspect_err(|err| error!("Error while loading hot wallet: {err:?}"))?; println!( "{}: {}", service.service_data.service_name, @@ -205,6 +217,7 @@ pub async fn remove( if verbosity != VerbosityLevel::Minimal { print_banner("Remove Safenode Services"); } + info!("Removing safe node services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"); let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?; refresh_node_registry( @@ -216,6 +229,7 @@ pub async fn remove( let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?; if service_indices.is_empty() { + info!("Service indices is empty, no services were eligible for removal"); // This could be the case if all services are at `Removed` status. if verbosity != VerbosityLevel::Minimal { println!("No services were eligible for removal"); @@ -232,9 +246,13 @@ pub async fn remove( ServiceManager::new(service, Box::new(ServiceController {}), verbosity); match service_manager.remove(keep_directories).await { Ok(()) => { + debug!("Removed service {}", node.service_name); node_registry.save()?; } - Err(e) => failed_services.push((node.service_name.clone(), e.to_string())), + Err(err) => { + error!("Failed to remove service {}: {err}", node.service_name); + failed_services.push((node.service_name.clone(), err.to_string())) + } } } @@ -243,6 +261,7 @@ pub async fn remove( pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> { print_banner("Reset Safenode Services"); + info!("Resetting all safenode services, with force={force}"); if !force { println!("WARNING: all safenode services, data, and logs will be removed."); @@ -264,6 +283,7 @@ pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> { // error if the file doesn't exist. On Windows this has been observed to happen. let node_registry_path = config::get_node_registry_path()?; if node_registry_path.exists() { + info!("Removing node registry file: {node_registry_path:?}"); std::fs::remove_file(node_registry_path)?; } @@ -279,6 +299,9 @@ pub async fn start( if verbosity != VerbosityLevel::Minimal { print_banner("Start Safenode Services"); } + info!( + "Starting safenode services with interval={interval} for: {peer_ids:?}, {service_names:?}" + ); let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?; refresh_node_registry( @@ -290,6 +313,7 @@ pub async fn start( let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?; if service_indices.is_empty() { + info!("Service indices is empty, no services were eligible to be started"); // This could be the case if all services are at `Removed` status. if verbosity != VerbosityLevel::Minimal { println!("No services were eligible to be started"); @@ -314,9 +338,13 @@ pub async fn start( } match service_manager.start().await { Ok(()) => { + debug!("Started service {}", node.service_name); node_registry.save()?; } - Err(e) => failed_services.push((node.service_name.clone(), e.to_string())), + Err(err) => { + error!("Failed to start service {}: {err}", node.service_name); + failed_services.push((node.service_name.clone(), err.to_string())) + } } } @@ -350,6 +378,7 @@ pub async fn stop( if verbosity != VerbosityLevel::Minimal { print_banner("Stop Safenode Services"); } + info!("Stopping safenode services for: {peer_ids:?}, {service_names:?}"); let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?; refresh_node_registry( @@ -361,6 +390,7 @@ pub async fn stop( let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?; if service_indices.is_empty() { + info!("Service indices is empty, no services were eligible to be stopped"); // This could be the case if all services are at `Removed` status. if verbosity != VerbosityLevel::Minimal { println!("No services were eligible to be stopped"); @@ -377,9 +407,13 @@ pub async fn stop( ServiceManager::new(service, Box::new(ServiceController {}), verbosity); match service_manager.stop().await { Ok(()) => { + debug!("Stopped service {}", node.service_name); node_registry.save()?; } - Err(e) => failed_services.push((node.service_name.clone(), e.to_string())), + Err(err) => { + error!("Failed to stop service {}: {err}", node.service_name); + failed_services.push((node.service_name.clone(), err.to_string())) + } } } @@ -406,6 +440,9 @@ pub async fn upgrade( if verbosity != VerbosityLevel::Minimal { print_banner("Upgrade Safenode Services"); } + info!( + "Upgrading safenode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}" + ); let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path( custom_bin_path.clone(), @@ -424,7 +461,10 @@ pub async fn upgrade( ) .await?; - println!("listen addresses: {:?}", node_registry.nodes[0].listen_addr); + debug!( + "listen addresses for nodes[0]: {:?}", + node_registry.nodes[0].listen_addr + ); if !use_force { let node_versions = node_registry .nodes @@ -435,6 +475,7 @@ pub async fn upgrade( .iter() .any(|current_version| current_version < &target_version); if !any_nodes_need_upgraded { + info!("All nodes are at the latest version, no upgrade required."); if verbosity != VerbosityLevel::Minimal { println!("{} All nodes are at the latest version", "✓".green()); } @@ -443,6 +484,7 @@ pub async fn upgrade( } let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?; + trace!("service_indices len: {}", service_indices.len()); let mut upgrade_summary = Vec::new(); for &index in &service_indices { @@ -461,6 +503,7 @@ pub async fn upgrade( target_bin_path: upgrade_bin_path.clone(), target_version: target_version.clone(), }; + let service_name = node.service_name.clone(); let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); let service = NodeService::new(node, Box::new(rpc_client)); @@ -469,6 +512,7 @@ pub async fn upgrade( match service_manager.upgrade(options).await { Ok(upgrade_result) => { + info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",); if upgrade_result != UpgradeResult::NotRequired { // It doesn't seem useful to apply the interval if there was no upgrade // required for the previous service. @@ -480,10 +524,11 @@ pub async fn upgrade( upgrade_result, )); } - Err(e) => { + Err(err) => { + error!("Error upgrading service {service_name}: {err}"); upgrade_summary.push(( node.service_name.clone(), - UpgradeResult::Error(format!("Error: {}", e)), + UpgradeResult::Error(format!("Error: {}", err)), )); } } @@ -665,12 +710,14 @@ fn get_services_for_ops( { service_indices.push(index); } else { + error!("No service named '{name}'"); return Err(eyre!(format!("No service named '{name}'"))); } } for peer_id_str in &peer_ids { - let peer_id = PeerId::from_str(peer_id_str)?; + let peer_id = PeerId::from_str(peer_id_str) + .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?; if let Some(index) = node_registry .nodes .iter() @@ -678,9 +725,9 @@ fn get_services_for_ops( { service_indices.push(index); } else { + error!("Could not find node with peer id: '{peer_id:?}'"); return Err(eyre!(format!( - "Could not find node with peer ID '{}'", - peer_id + "Could not find node with peer ID '{peer_id}'", ))); } } @@ -702,6 +749,7 @@ fn summarise_any_failed_ops( } } + error!("Failed to {verb} one or more services"); return Err(eyre!("Failed to {verb} one or more services")); } Ok(()) diff --git a/sn_node_manager/src/config.rs b/sn_node_manager/src/config.rs index 309a0eea75..5648625d12 100644 --- a/sn_node_manager/src/config.rs +++ b/sn_node_manager/src/config.rs @@ -29,13 +29,16 @@ pub fn get_node_manager_path() -> Result { let path = if is_running_as_root() { let path = PathBuf::from("/var/safenode-manager/"); + debug!("Running as root, creating node_manager_path and setting perms if path doesn't exists: {path:?}"); std::fs::create_dir_all(&path)?; let mut perm = std::fs::metadata(&path)?.permissions(); perm.set_mode(0o755); // set permissions to rwxr-xr-x std::fs::set_permissions(&path, perm)?; path } else { - get_user_safenode_data_dir()? + let path = get_user_safenode_data_dir()?; + debug!("Running as non-root, node_manager_path is: {path:?}"); + path }; if is_running_as_root() && !path.exists() { @@ -52,6 +55,8 @@ pub fn get_node_manager_path() -> Result { pub fn get_node_manager_path() -> Result { use std::path::Path; let path = Path::new("C:\\ProgramData\\safenode-manager"); + debug!("Running as root, creating node_manager_path at: {path:?}"); + if !path.exists() { std::fs::create_dir_all(path)?; } @@ -65,6 +70,7 @@ pub fn get_node_registry_path() -> Result { let path = get_node_manager_path()?; let node_registry_path = path.join("node_registry.json"); if is_running_as_root() && !node_registry_path.exists() { + debug!("Running as root and node_registry_path doesn't exist, creating node_registry_path and setting perms at: {node_registry_path:?}"); std::fs::OpenOptions::new() .write(true) .create(true) @@ -81,6 +87,7 @@ pub fn get_node_registry_path() -> Result { perm.set_mode(0o777); std::fs::set_permissions(node_registry_path.clone(), perm)?; } + debug!("Node registry path is: {node_registry_path:?}"); Ok(node_registry_path) } @@ -92,6 +99,8 @@ pub fn get_node_registry_path() -> Result { if !path.exists() { std::fs::create_dir_all(path)?; } + debug!("Node registry path is: {path:?}"); + Ok(path.join("node_registry.json")) } @@ -106,11 +115,21 @@ pub fn get_service_data_dir_path( owner: Option, ) -> Result { let path = match custom_path { - Some(p) => p, - None => match owner.is_some() { - true => PathBuf::from("/var/safenode-manager/services"), - false => get_user_safenode_data_dir()?, - }, + Some(p) => { + debug!("Using custom path for service data dir: {p:?}"); + p + } + None => { + if owner.is_some() { + let path = PathBuf::from("/var/safenode-manager/services"); + debug!("Using default path for service data dir: {path:?}"); + path + } else { + let path = get_user_safenode_data_dir()?; + debug!("Using user mode service data dir: {path:?}"); + path + } + } }; if let Some(owner) = owner { create_owned_dir(path.clone(), &owner)?; @@ -124,8 +143,15 @@ pub fn get_service_data_dir_path( _owner: Option, ) -> Result { let path = match custom_path { - Some(p) => p, - None => PathBuf::from("C:\\ProgramData\\safenode\\data"), + Some(p) => { + debug!("Using custom path for service data dir: {p:?}"); + p + } + None => { + let path = PathBuf::from("C:\\ProgramData\\safenode\\data"); + debug!("Using default path for service data dir: {path:?}"); + path + } }; std::fs::create_dir_all(&path)?; Ok(path) @@ -143,11 +169,21 @@ pub fn get_service_log_dir_path( owner: Option, ) -> Result { let path = match custom_path { - Some(p) => p, - None => match owner.is_some() { - true => PathBuf::from("/var/log").join(bin_type.to_string()), - false => get_user_safenode_data_dir()?, - }, + Some(p) => { + debug!("Using custom path for service log dir: {p:?}"); + p + } + None => { + if owner.is_some() { + let path = PathBuf::from("/var/log").join(bin_type.to_string()); + debug!("Using default path for service log dir: {path:?}"); + path + } else { + let path = get_user_safenode_data_dir()?; + debug!("Using user mode service log dir: {path:?}"); + path + } + } }; if let Some(owner) = owner { create_owned_dir(path.clone(), &owner)?; @@ -162,10 +198,17 @@ pub fn get_service_log_dir_path( _owner: Option, ) -> Result { let path = match custom_path { - Some(p) => p, - None => PathBuf::from("C:\\ProgramData") - .join(bin_type.to_string()) - .join("logs"), + Some(p) => { + debug!("Using custom path for service log dir: {p:?}"); + p + } + None => { + let path = PathBuf::from("C:\\ProgramData") + .join(bin_type.to_string()) + .join("logs"); + debug!("Using default path for service log dir: {path:?}"); + path + } }; std::fs::create_dir_all(&path)?; Ok(path) @@ -173,6 +216,7 @@ pub fn get_service_log_dir_path( #[cfg(unix)] pub fn create_owned_dir(path: PathBuf, owner: &str) -> Result<()> { + debug!("Creating owned dir and setting permissions: {path:?} with owner: {owner}"); use nix::unistd::{chown, Gid, Uid}; use std::os::unix::fs::PermissionsExt; use users::get_user_by_name; @@ -181,7 +225,10 @@ pub fn create_owned_dir(path: PathBuf, owner: &str) -> Result<()> { let permissions = std::fs::Permissions::from_mode(0o755); std::fs::set_permissions(&path, permissions)?; - let user = get_user_by_name(owner).ok_or_else(|| eyre!("User '{owner}' does not exist"))?; + let user = get_user_by_name(owner).ok_or_else(|| { + error!("User '{owner}' does not exist"); + eyre!("User '{owner}' does not exist") + })?; let uid = Uid::from_raw(user.uid()); let gid = Gid::from_raw(user.primary_group_id()); chown(&path, Some(uid), Some(gid))?; @@ -190,6 +237,7 @@ pub fn create_owned_dir(path: PathBuf, owner: &str) -> Result<()> { #[cfg(windows)] pub fn create_owned_dir(path: PathBuf, _owner: &str) -> Result<()> { + debug!("Creating owned dir: {path:?}"); std::fs::create_dir_all(path)?; Ok(()) } @@ -207,7 +255,10 @@ pub fn is_running_as_root() -> bool { pub fn get_user_safenode_data_dir() -> Result { Ok(dirs_next::data_dir() - .ok_or_else(|| eyre!("Could not obtain user data directory"))? + .ok_or_else(|| { + error!("Failed to get data_dir"); + eyre!("Could not obtain user data directory") + })? .join("safe") .join("node")) } diff --git a/sn_node_manager/src/helpers.rs b/sn_node_manager/src/helpers.rs index d62f72691e..2e3fae01bf 100644 --- a/sn_node_manager/src/helpers.rs +++ b/sn_node_manager/src/helpers.rs @@ -27,6 +27,7 @@ const MAX_DOWNLOAD_RETRIES: u8 = 3; #[cfg(windows)] pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> { if which::which("winsw.exe").is_ok() { + debug!("WinSW already installed, which returned Ok"); return Ok(()); } @@ -34,6 +35,7 @@ pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Res if verbosity != VerbosityLevel::Minimal { println!("Downloading winsw.exe..."); } + debug!("Downloading WinSW to {dest_path:?}"); let release_repo = ::default_config(); @@ -59,6 +61,7 @@ pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Res let mut download_attempts = 1; loop { if download_attempts > MAX_DOWNLOAD_RETRIES { + error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries."); bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries."); } match release_repo.download_winsw(dest_path, &callback).await { @@ -68,6 +71,7 @@ pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Res println!("Error downloading WinSW: {e:?}"); println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}"); } + error!("Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}"); download_attempts += 1; if let Some(pb) = &pb { pb.finish_and_clear(); @@ -79,8 +83,12 @@ pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Res if let Some(pb) = pb { pb.finish_and_clear(); } + } else { + debug!("WinSW already installed, dest_path exists: {dest_path:?}"); } + info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable."); + std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string()); Ok(()) @@ -104,6 +112,9 @@ pub async fn download_and_extract_release( verbosity: VerbosityLevel, download_dir_path: Option, ) -> Result<(PathBuf, String)> { + debug!( + "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}" + ); let mut pb = None; let callback = if verbosity != VerbosityLevel::Minimal { let progress_bar = Arc::new(ProgressBar::new(0)); @@ -134,14 +145,17 @@ pub async fn download_and_extract_release( std::fs::create_dir_all(&path)?; path }; + debug!("Download directory: {download_dir_path:?}"); let mut download_attempts = 1; let binary_download_path = loop { if download_attempts > MAX_DOWNLOAD_RETRIES { + error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries."); bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries."); } if let Some(url) = &url { + info!("Downloading release from {url}"); if verbosity != VerbosityLevel::Minimal { println!("Retrieving {release_type} from {url}"); } @@ -150,11 +164,13 @@ pub async fn download_and_extract_release( .await { Ok(archive_path) => { - let binary_download_path = - release_repo.extract_release_archive(&archive_path, &download_dir_path)?; + let binary_download_path = release_repo + .extract_release_archive(&archive_path, &download_dir_path) + .inspect_err(|err| error!("Error while extracting archive {err:?}"))?; break binary_download_path; } Err(err) => { + error!("Error downloading release: {err:?}"); if verbosity != VerbosityLevel::Minimal { println!("Error downloading release: {err:?}"); println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}"); @@ -167,12 +183,19 @@ pub async fn download_and_extract_release( } } else { let version = if let Some(version) = version.clone() { - Version::parse(&version)? + let version = Version::parse(&version)?; + info!("Downloading release from S3 for version {version}"); + version } else { if verbosity != VerbosityLevel::Minimal { println!("Retrieving latest version for {release_type}..."); } - release_repo.get_latest_version(&release_type).await? + let version = release_repo + .get_latest_version(&release_type) + .await + .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?; + info!("Downloading latest version from S3: {version}"); + version }; let archive_name = format!( @@ -187,12 +210,14 @@ pub async fn download_and_extract_release( // try extracting it, else download it. match release_repo.extract_release_archive(&archive_path, &download_dir_path) { Ok(binary_download_path) => { + info!("Using cached {release_type} version {version}..."); if verbosity != VerbosityLevel::Minimal { println!("Using cached {release_type} version {version}..."); } break binary_download_path; } Err(_) => { + info!("Cached {release_type} version {version} is corrupted. Downloading again..."); if verbosity != VerbosityLevel::Minimal { println!("Cached {release_type} version {version} is corrupted. Downloading again..."); } @@ -220,6 +245,7 @@ pub async fn download_and_extract_release( break binary_download_path; } Err(err) => { + error!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}"); if verbosity != VerbosityLevel::Minimal { println!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}"); } @@ -234,6 +260,7 @@ pub async fn download_and_extract_release( if let Some(pb) = pb { pb.finish_and_clear(); } + info!("Download completed: {binary_download_path:?}"); if verbosity != VerbosityLevel::Minimal { println!("Download completed: {}", &binary_download_path.display()); @@ -248,22 +275,32 @@ pub async fn download_and_extract_release( } pub fn get_bin_version(bin_path: &PathBuf) -> Result { + trace!("Obtaining version of binary {bin_path:?}"); let mut cmd = Command::new(bin_path) .arg("--version") .stdout(Stdio::piped()) - .spawn()?; + .spawn() + .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?; let mut output = String::new(); cmd.stdout .as_mut() - .ok_or_else(|| eyre!("Failed to capture stdout"))? - .read_to_string(&mut output)?; + .ok_or_else(|| { + error!("Failed to capture stdout"); + eyre!("Failed to capture stdout") + })? + .read_to_string(&mut output) + .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?; let version = output .split_whitespace() .last() - .ok_or_else(|| eyre!("Failed to parse version"))? + .ok_or_else(|| { + error!("Failed to parse version"); + eyre!("Failed to parse version") + })? .to_string(); + trace!("Obtained version of binary: {version}"); Ok(version) } @@ -284,6 +321,7 @@ pub fn create_temp_dir() -> Result { let temp_dir = std::env::temp_dir(); let unique_dir_name = uuid::Uuid::new_v4().to_string(); let new_temp_dir = temp_dir.join(unique_dir_name); - std::fs::create_dir_all(&new_temp_dir)?; + std::fs::create_dir_all(&new_temp_dir) + .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?; Ok(new_temp_dir) } diff --git a/sn_node_manager/src/lib.rs b/sn_node_manager/src/lib.rs index 2728750a42..b0aa43afa5 100644 --- a/sn_node_manager/src/lib.rs +++ b/sn_node_manager/src/lib.rs @@ -76,6 +76,7 @@ impl ServiceManager { } pub async fn start(&mut self) -> Result<()> { + info!("Starting the {} service", self.service.name()); if ServiceStatus::Running == self.service.status() { // The last time we checked the service was running, but it doesn't mean it's actually // running at this point in time. If it is running, we don't need to do anything. If it @@ -85,6 +86,7 @@ impl ServiceManager { .service_control .is_service_process_running(self.service.pid().unwrap()) { + debug!("The {} service is already running", self.service.name()); if self.verbosity != VerbosityLevel::Minimal { println!("The {} service is already running", self.service.name()); } @@ -118,15 +120,23 @@ impl ServiceManager { ); } Err(sn_service_management::error::Error::ServiceProcessNotFound(_)) => { + error!("The '{}' service has failed to start because ServiceProcessNotFound when fetching PID", self.service.name()); return Err(eyre!( "The '{}' service has failed to start", self.service.name() )); } - Err(e) => return Err(e.into()), + Err(err) => { + error!("Failed to start service, because PID could not be obtained: {err}"); + return Err(err.into()); + } } self.service.on_start().await?; + info!( + "Service {} has been started successfully", + self.service.name() + ); if self.verbosity != VerbosityLevel::Minimal { println!("{} Started {} service", "✓".green(), self.service.name()); @@ -154,8 +164,13 @@ impl ServiceManager { } pub async fn stop(&mut self) -> Result<()> { + info!("Stopping the {} service", self.service.name()); match self.service.status() { ServiceStatus::Added => { + debug!( + "The {} service has not been started since it was installed", + self.service.name() + ); if self.verbosity != VerbosityLevel::Minimal { println!( "Service {} has not been started since it was installed", @@ -165,6 +180,7 @@ impl ServiceManager { Ok(()) } ServiceStatus::Removed => { + debug!("The {} service has been removed", self.service.name()); if self.verbosity != VerbosityLevel::Minimal { println!("Service {} has been removed", self.service.name()); } @@ -189,13 +205,16 @@ impl ServiceManager { ); } } else if self.verbosity != VerbosityLevel::Minimal { + debug!("Service {name} was already stopped"); println!("{} Service {} was already stopped", "✓".green(), name); } self.service.on_stop().await?; + info!("Service {name} has been stopped successfully."); Ok(()) } ServiceStatus::Stopped => { + debug!("Service {} was already stopped", self.service.name()); if self.verbosity != VerbosityLevel::Minimal { println!( "{} Service {} was already stopped", @@ -215,12 +234,20 @@ impl ServiceManager { .pid() .ok_or_eyre("Could not obtain PID for running node")?, ) { + error!( + "Service {} is already running. Stop it before removing it", + self.service.name() + ); return Err(eyre!("A running service cannot be removed") .suggestion("Stop the node first then try again")); } // If the node wasn't actually running, we should give the user an opportunity to // check why it may have failed before removing everything. self.service.on_stop().await?; + error!( + "The service: {} was marked as running but it had actually stopped", + self.service.name() + ); return Err( eyre!("This service was marked as running but it had actually stopped") .suggestion("You may want to check the logs for errors before removing it") @@ -232,31 +259,47 @@ impl ServiceManager { .service_control .uninstall(&self.service.name(), self.service.is_user_mode()) { - Ok(()) => {} - Err(e) => match e { + Ok(()) => { + debug!("Service {} has been uninstalled", self.service.name()); + } + Err(err) => match err { ServiceError::ServiceRemovedManually(name) => { + warn!("The user appears to have removed the {name} service manually",); // The user has deleted the service definition file, which the service manager // crate treats as an error. We then return our own error type, which allows us // to handle it here and just proceed with removing the service from the // registry. println!("The user appears to have removed the {name} service manually"); } - _ => return Err(e.into()), + _ => { + error!("Error uninstalling the service: {err}"); + return Err(err.into()); + } }, } if !keep_directories { + debug!( + "Removing data and log directories for service: {}", + self.service.name() + ); // It's possible the user deleted either of these directories manually. // We can just proceed with removing the service from the registry. if self.service.data_dir_path().exists() { + debug!("Removing data_dir: {:?}", self.service.data_dir_path()); std::fs::remove_dir_all(self.service.data_dir_path())?; } if self.service.log_dir_path().exists() { + debug!("Removing log_dir: {:?}", self.service.log_dir_path()); std::fs::remove_dir_all(self.service.log_dir_path())?; } } self.service.on_remove(); + info!( + "Service {} has been removed successfully.", + self.service.name() + ); if self.verbosity != VerbosityLevel::Minimal { println!( @@ -275,9 +318,14 @@ impl ServiceManager { && (current_version == options.target_version || options.target_version < current_version) { + info!( + "The service {} is already at the latest version. No upgrade is required.", + self.service.name() + ); return Ok(UpgradeResult::NotRequired); } + debug!("Stopping the service and copying the binary"); self.stop().await?; std::fs::copy(options.clone().target_bin_path, self.service.bin_path())?; @@ -292,13 +340,14 @@ impl ServiceManager { if options.start_service { match self.start().await { Ok(()) => {} - Err(e) => { + Err(err) => { self.service .set_version(&options.target_version.to_string()); + info!("The service has been upgraded but could not be started: {err}"); return Ok(UpgradeResult::UpgradedButNotStarted( current_version.to_string(), options.target_version.to_string(), - e.to_string(), + err.to_string(), )); } } @@ -457,12 +506,20 @@ pub async fn refresh_node_registry( if print_refresh_message { println!("Refreshing the node registry..."); } + info!("Refreshing the node registry"); for node in &mut node_registry.nodes { // The `status` command can run before a node is started and therefore before its wallet // exists. match HotWallet::try_load_from(&node.data_dir_path) { - Ok(wallet) => node.reward_balance = Some(wallet.balance()), + Ok(wallet) => { + node.reward_balance = Some(wallet.balance()); + trace!( + "Wallet balance for node {}: {}", + node.service_name, + wallet.balance() + ); + } Err(_) => node.reward_balance = None, } @@ -472,6 +529,10 @@ pub async fn refresh_node_registry( // First we can try the PID we have now. If there is still a process running with // that PID, we know the node is still running. if service_control.is_service_process_running(pid) { + debug!( + "The process for node {} is still running at PID {pid}", + node.service_name + ); match rpc_client.network_info().await { Ok(info) => { node.connected_peers = Some(info.connected_peers); @@ -485,13 +546,22 @@ pub async fn refresh_node_registry( // service has been configured to restart on failures, it's possible that a new // process has been launched and hence we would have a new PID. We can use the // RPC service to try and retrieve it. + debug!( + "The process for node {} has died. Attempting to retrieve new PID", + node.service_name + ); match rpc_client.node_info().await { Ok(info) => { node.pid = Some(info.pid); + debug!("New PID for node {} is= {:?}", node.service_name, node.pid); } Err(_) => { // Finally, if there was an error communicating with the RPC client, we // can assume that this node is actually stopped. + debug!( + "Failed to retrieve new PID for node {}. Assuming it's stopped", + node.service_name + ); node.status = ServiceStatus::Stopped; node.pid = None; } diff --git a/sn_node_manager/src/local.rs b/sn_node_manager/src/local.rs index 2a41e10758..aea74eb106 100644 --- a/sn_node_manager/src/local.rs +++ b/sn_node_manager/src/local.rs @@ -55,6 +55,7 @@ impl Launcher for LocalSafeLauncher { } fn launch_faucet(&self, genesis_multiaddr: &Multiaddr) -> Result { + info!("Launching the faucet server..."); let args = vec![ "--peer".to_string(), genesis_multiaddr.to_string(), @@ -104,7 +105,8 @@ impl Launcher for LocalSafeLauncher { .args(args) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()) - .spawn()?; + .spawn() + .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?; Ok(()) } @@ -130,6 +132,7 @@ pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Res // faucet is not `None`, the pid also must have a value. if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) { process.kill(); + debug!("Faucet has been killed"); println!("{} Killed faucet", "✓".green()); } } @@ -140,12 +143,14 @@ pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Res .join("test_faucet"); if faucet_data_path.is_dir() { std::fs::remove_dir_all(faucet_data_path)?; + debug!("Removed faucet data directory"); } let genesis_data_path = dirs_next::data_dir() .ok_or_else(|| eyre!("Could not obtain user's data directory"))? .join("safe") .join("test_genesis"); if genesis_data_path.is_dir() { + debug!("Removed genesis data directory"); std::fs::remove_dir_all(genesis_data_path)?; } @@ -159,6 +164,7 @@ pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Res // anything anyway. if let Some(process) = system.process(Pid::from(pid as usize)) { process.kill(); + debug!("Killed node: {} ({})", node.service_name, pid); println!(" {} Killed process", "✓".green()); } } @@ -167,6 +173,7 @@ pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Res // At this point we don't allow path overrides, so deleting the data directory will clear // the log directory also. std::fs::remove_dir_all(&node.data_dir_path)?; + debug!("Removed node data directory: {:?}", node.data_dir_path); println!( " {} Removed {}", "✓".green(), @@ -196,6 +203,8 @@ pub async fn run_network( node_registry: &mut NodeRegistry, service_control: &dyn ServiceControl, ) -> Result<()> { + info!("Running local network"); + let launcher = LocalSafeLauncher { safenode_bin_path: options.safenode_bin_path.to_path_buf(), faucet_bin_path: options.faucet_bin_path.to_path_buf(), @@ -274,6 +283,7 @@ pub async fn run_network( } if !options.skip_validation { + debug!("Waiting for 10 seconds before validating the network..."); println!("Waiting for 10 seconds before validating the network..."); std::thread::sleep(std::time::Duration::from_secs(10)); validate_network(node_registry, bootstrap_peers.clone()).await?; @@ -315,6 +325,7 @@ pub async fn run_node( launcher: &dyn Launcher, rpc_client: &dyn RpcActions, ) -> Result { + info!("Launching node {}...", run_options.number); println!("Launching node {}...", run_options.number); launcher.launch_node( run_options.owner.clone(), @@ -391,6 +402,7 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec Result<()> { for peer_id in peer_ids { + debug!("Sending NodeServiceRestartRequest to {peer_id:?} at {rpc_server_address:?}"); let str_bytes = PeerId::from_str(&peer_id)?.to_bytes(); let mut daemon_client = get_rpc_client(rpc_server_address).await?; @@ -33,6 +34,7 @@ pub async fn restart_node( })) .await .map_err(|err| { + error!("Failed to restart node service with {peer_id:?} at {rpc_server_address:?} with err: {err:?}"); eyre!( "Failed to restart node service with {peer_id:?} at {:?} with err: {err:?}", daemon_client.addr @@ -54,9 +56,11 @@ async fn get_rpc_client(socket_addr: SocketAddr) -> Result { return Ok(rpc_client); } attempts += 1; + error!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10"); println!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10"); tokio::time::sleep(Duration::from_secs(1)).await; if attempts >= 10 { + error!("Failed to connect to {endpoint:?} even after 10 retries"); bail!("Failed to connect to {endpoint:?} even after 10 retries"); } } diff --git a/sn_service_management/src/auditor.rs b/sn_service_management/src/auditor.rs index 5c3c76b0fb..f49ad6c61f 100644 --- a/sn_service_management/src/auditor.rs +++ b/sn_service_management/src/auditor.rs @@ -72,7 +72,11 @@ impl<'a> ServiceStateActions for AuditorService<'a> { autostart: true, contents: None, environment: options.env_variables, - label: self.service_data.service_name.parse()?, + label: self + .service_data + .service_name + .parse() + .inspect_err(|err| error!("Failed to parse service name: {err:?}"))?, program: self.service_data.auditor_path.to_path_buf(), username: Some(self.service_data.user.to_string()), working_directory: None, diff --git a/sn_service_management/src/control.rs b/sn_service_management/src/control.rs index fcd95ec4a7..e3289fd1bf 100644 --- a/sn_service_management/src/control.rs +++ b/sn_service_management/src/control.rs @@ -7,7 +7,6 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::error::{Error, Result}; - use service_manager::{ ServiceInstallCtx, ServiceLabel, ServiceLevel, ServiceManager, ServiceStartCtx, ServiceStopCtx, ServiceUninstallCtx, @@ -44,7 +43,11 @@ impl ServiceControl for ServiceController { fn create_service_user(&self, username: &str) -> Result<()> { use std::process::Command; - let output = Command::new("id").arg("-u").arg(username).output()?; + let output = Command::new("id") + .arg("-u") + .arg(username) + .output() + .inspect_err(|err| error!("Failed to execute id -u: {err:?}"))?; if output.status.success() { println!("The {username} user already exists"); return Ok(()); @@ -52,12 +55,14 @@ impl ServiceControl for ServiceController { let useradd_exists = Command::new("which") .arg("useradd") - .output()? + .output() + .inspect_err(|err| error!("Failed to execute which useradd: {err:?}"))? .status .success(); let adduser_exists = Command::new("which") .arg("adduser") - .output()? + .output() + .inspect_err(|err| error!("Failed to execute which adduser: {err:?}"))? .status .success(); @@ -67,22 +72,27 @@ impl ServiceControl for ServiceController { .arg("-s") .arg("/bin/bash") .arg(username) - .output()? + .output() + .inspect_err(|err| error!("Failed to execute useradd: {err:?}"))? } else if adduser_exists { Command::new("adduser") .arg("-s") .arg("/bin/busybox") .arg("-D") .arg(username) - .output()? + .output() + .inspect_err(|err| error!("Failed to execute adduser: {err:?}"))? } else { + error!("Neither useradd nor adduser is available. ServiceUserAccountCreationFailed"); return Err(Error::ServiceUserAccountCreationFailed); }; if !output.status.success() { + error!("Failed to create {username} user account: {output:?}"); return Err(Error::ServiceUserAccountCreationFailed); } println!("Created {username} user account for running the service"); + info!("Created {username} user account for running the service"); Ok(()) } @@ -95,8 +105,10 @@ impl ServiceControl for ServiceController { .arg(".") .arg("-list") .arg("/Users") - .output()?; - let output_str = str::from_utf8(&output.stdout)?; + .output() + .inspect_err(|err| error!("Failed to execute dscl: {err:?}"))?; + let output_str = str::from_utf8(&output.stdout) + .inspect_err(|err| error!("Error while converting output to utf8: {err:?}"))?; if output_str.lines().any(|line| line == username) { return Ok(()); } @@ -106,8 +118,10 @@ impl ServiceControl for ServiceController { .arg("-list") .arg("/Users") .arg("UniqueID") - .output()?; - let output_str = str::from_utf8(&output.stdout)?; + .output() + .inspect_err(|err| error!("Failed to execute dscl: {err:?}"))?; + let output_str = str::from_utf8(&output.stdout) + .inspect_err(|err| error!("Error while converting output to utf8: {err:?}"))?; let mut max_id = 0; for line in output_str.lines() { @@ -135,8 +149,13 @@ impl ServiceControl for ServiceController { format!("dscl . -create /Users/{} PrimaryGroupID 20", username), ]; for cmd in commands { - let status = Command::new("sh").arg("-c").arg(&cmd).status()?; + let status = Command::new("sh") + .arg("-c") + .arg(&cmd) + .status() + .inspect_err(|err| error!("Error while executing dscl command: {err:?}"))?; if !status.success() { + error!("The command {cmd} failed to execute. ServiceUserAccountCreationFailed"); return Err(Error::ServiceUserAccountCreationFailed); } } @@ -160,6 +179,7 @@ impl ServiceControl for ServiceController { let socket = TcpListener::bind(addr)?; let port = socket.local_addr()?.port(); drop(socket); + trace!("Got available port: {port}"); Ok(port) } @@ -172,60 +192,90 @@ impl ServiceControl for ServiceController { if bin_path == path { // There does not seem to be any easy way to get the process ID from the `Pid` // type. Probably something to do with representing it in a cross-platform way. + trace!("Found process {bin_path:?} with PID: {pid}"); return Ok(pid.to_string().parse::()?); } } } + error!("Process not found: {bin_path:?}. PID could not be retrieved"); Err(Error::ServiceProcessNotFound( bin_path.to_string_lossy().to_string(), )) } fn install(&self, install_ctx: ServiceInstallCtx, user_mode: bool) -> Result<()> { - let mut manager = ::native()?; + debug!("Installing service: {install_ctx:?}"); + let mut manager = ::native() + .inspect_err(|err| error!("Could not get native ServiceManage: {err:?}"))?; if user_mode { - manager.set_level(ServiceLevel::User)?; + manager + .set_level(ServiceLevel::User) + .inspect_err(|err| error!("Could not set service to user mode: {err:?}"))?; } - manager.install(install_ctx)?; + manager + .install(install_ctx) + .inspect_err(|err| error!("Error while installing service: {err:?}"))?; Ok(()) } fn start(&self, service_name: &str, user_mode: bool) -> Result<()> { + debug!("Starting service: {service_name}"); let label: ServiceLabel = service_name.parse()?; - let mut manager = ::native()?; + let mut manager = ::native() + .inspect_err(|err| error!("Could not get native ServiceManage: {err:?}"))?; if user_mode { - manager.set_level(ServiceLevel::User)?; + manager + .set_level(ServiceLevel::User) + .inspect_err(|err| error!("Could not set service to user mode: {err:?}"))?; } - manager.start(ServiceStartCtx { label })?; + manager + .start(ServiceStartCtx { label }) + .inspect_err(|err| error!("Error while starting service: {err:?}"))?; Ok(()) } fn stop(&self, service_name: &str, user_mode: bool) -> Result<()> { + debug!("Stopping service: {service_name}"); let label: ServiceLabel = service_name.parse()?; - let mut manager = ::native()?; + let mut manager = ::native() + .inspect_err(|err| error!("Could not get native ServiceManage: {err:?}"))?; if user_mode { - manager.set_level(ServiceLevel::User)?; + manager + .set_level(ServiceLevel::User) + .inspect_err(|err| error!("Could not set service to user mode: {err:?}"))?; } - manager.stop(ServiceStopCtx { label })?; + manager + .stop(ServiceStopCtx { label }) + .inspect_err(|err| error!("Error while stopping service: {err:?}"))?; + Ok(()) } fn uninstall(&self, service_name: &str, user_mode: bool) -> Result<()> { + debug!("Uninstalling service: {service_name}"); let label: ServiceLabel = service_name.parse()?; - let mut manager = ::native()?; + let mut manager = ::native() + .inspect_err(|err| error!("Could not get native ServiceManage: {err:?}"))?; + if user_mode { - manager.set_level(ServiceLevel::User)?; + manager + .set_level(ServiceLevel::User) + .inspect_err(|err| error!("Could not set service to user mode: {err:?}"))?; } match manager.uninstall(ServiceUninstallCtx { label }) { Ok(()) => Ok(()), Err(e) => match e.kind() { std::io::ErrorKind::NotFound => { + error!("Error while uninstall service, service file might have been removed manually: {service_name}"); // In this case the user has removed the service definition file manually, // which the service manager crate treats as an error. We can propagate the // it to the caller and they can decide how to handle it. Err(Error::ServiceRemovedManually(service_name.to_string())) } - _ => Err(e.into()), + _ => { + error!("Error while uninstalling service: {e:?}"); + Err(e.into()) + } }, } } @@ -234,6 +284,7 @@ impl ServiceControl for ServiceController { /// /// This is wrapped mainly just for unit testing. fn wait(&self, delay: u64) { + trace!("Waiting for {delay} milliseconds"); std::thread::sleep(std::time::Duration::from_millis(delay)); } } diff --git a/sn_service_management/src/daemon.rs b/sn_service_management/src/daemon.rs index 9091097cde..3a0624eade 100644 --- a/sn_service_management/src/daemon.rs +++ b/sn_service_management/src/daemon.rs @@ -53,7 +53,10 @@ impl<'a> ServiceStateActions for DaemonService<'a> { let (address, port) = self .service_data .endpoint - .ok_or_else(|| Error::DaemonEndpointNotSet) + .ok_or_else(|| { + error!("Daemon endpoint not set in the service_data"); + Error::DaemonEndpointNotSet + }) .map(|e| (e.ip().to_string(), e.port().to_string()))?; let install_ctx = ServiceInstallCtx { args: vec![ diff --git a/sn_service_management/src/lib.rs b/sn_service_management/src/lib.rs index e0f9cb94eb..c6761f43d1 100644 --- a/sn_service_management/src/lib.rs +++ b/sn_service_management/src/lib.rs @@ -14,6 +14,9 @@ pub mod faucet; pub mod node; pub mod rpc; +#[macro_use] +extern crate tracing; + pub mod safenode_manager_proto { tonic::include_proto!("safenode_manager_proto"); } @@ -111,19 +114,26 @@ pub struct NodeRegistry { impl NodeRegistry { pub fn save(&self) -> Result<()> { + debug!("Saving node registry to: {:?}", self.save_path); let path = Path::new(&self.save_path); if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent)?; + std::fs::create_dir_all(parent).inspect_err(|err| { + error!("Error while creating node registry parent {parent:?}: {err:?}") + })?; } let json = serde_json::to_string(self)?; - let mut file = std::fs::File::create(self.save_path.clone())?; - file.write_all(json.as_bytes())?; + let mut file = std::fs::File::create(self.save_path.clone()) + .inspect_err(|err| error!("Error creating node registry file: {err:?}"))?; + file.write_all(json.as_bytes()) + .inspect_err(|err| error!("Error writing to node registry: {err:?}"))?; + Ok(()) } pub fn load(path: &Path) -> Result { if !path.exists() { + debug!("Loading default node registry as {path:?} does not exist"); return Ok(NodeRegistry { auditor: None, bootstrap_peers: vec![], @@ -135,10 +145,14 @@ impl NodeRegistry { save_path: path.to_path_buf(), }); } + debug!("Loading node registry from: {path:?}"); + + let mut file = std::fs::File::open(path) + .inspect_err(|err| error!("Error while opening node registry: {err:?}"))?; - let mut file = std::fs::File::open(path)?; let mut contents = String::new(); - file.read_to_string(&mut contents)?; + file.read_to_string(&mut contents) + .inspect_err(|err| error!("Error while reading node registry: {err:?}"))?; // It's possible for the file to be empty if the user runs a `status` command before any // services were added. @@ -159,7 +173,8 @@ impl NodeRegistry { } pub fn from_json(json: &str) -> Result { - let registry = serde_json::from_str(json)?; + let registry = serde_json::from_str(json) + .inspect_err(|err| error!("Error while deserializing node registry: {err:?}"))?; Ok(registry) } @@ -174,11 +189,16 @@ impl NodeRegistry { pub fn get_local_node_registry_path() -> Result { let path = dirs_next::data_dir() - .ok_or_else(|| Error::UserDataDirectoryNotObtainable)? + .ok_or_else(|| { + error!("Failed to get data_dir"); + Error::UserDataDirectoryNotObtainable + })? .join("safe") .join("local_node_registry.json"); if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent)?; + std::fs::create_dir_all(parent).inspect_err(|err| { + error!("Error while creating node registry parent {parent:?}: {err:?}") + })?; } Ok(path) } diff --git a/sn_service_management/src/node.rs b/sn_service_management/src/node.rs index 14d1a96679..b302e803c0 100644 --- a/sn_service_management/src/node.rs +++ b/sn_service_management/src/node.rs @@ -134,8 +134,15 @@ impl<'a> ServiceStateActions for NodeService<'a> { } async fn on_start(&mut self) -> Result<()> { - let node_info = self.rpc_actions.node_info().await?; - let network_info = self.rpc_actions.network_info().await?; + let node_info = + self.rpc_actions.node_info().await.inspect_err(|err| { + error!("Error while obtaining node_info through RPC: {err:?}") + })?; + let network_info = + self.rpc_actions.network_info().await.inspect_err(|err| { + error!("Error while obtaining network_info through RPC: {err:?}") + })?; + self.service_data.listen_addr = Some( network_info .listeners From bc3c373d17e2a04c03c0075499114703301fbfe6 Mon Sep 17 00:00:00 2001 From: grumbach Date: Fri, 31 May 2024 18:44:02 -0400 Subject: [PATCH 4/8] feat: dag crawling through generic crawling --- sn_cli/src/bin/subcommands/wallet/audit.rs | 4 +- sn_client/src/audit/dag_crawling.rs | 131 +++++++++++++++------ sn_transfers/src/wallet/error.rs | 3 + 3 files changed, 97 insertions(+), 41 deletions(-) diff --git a/sn_cli/src/bin/subcommands/wallet/audit.rs b/sn_cli/src/bin/subcommands/wallet/audit.rs index 29423fd8c6..c0e3833d50 100644 --- a/sn_cli/src/bin/subcommands/wallet/audit.rs +++ b/sn_cli/src/bin/subcommands/wallet/audit.rs @@ -64,7 +64,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> println!("Found a local spend dag on disk, continuing from it..."); if fast_mode { client - .spend_dag_continue_from_utxos(&mut dag, Default::default(), false) + .spend_dag_continue_from_utxos(&mut dag, None, false) .await; } dag @@ -75,7 +75,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); if fast_mode { client - .spend_dag_build_from(genesis_addr, Default::default(), true) + .spend_dag_build_from(genesis_addr, None, true) .await? } else { client.new_dag_with_genesis_only().await? diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 324cb18565..f0d8bbc10d 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -16,6 +16,8 @@ use sn_transfers::{ use std::collections::BTreeSet; use tokio::sync::mpsc::Sender; +const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; + enum InternalGetNetworkSpend { Spend(Box), DoubleSpend(Box, Box), @@ -62,26 +64,93 @@ impl Client { spend_processing: Option>, verify: bool, ) -> WalletResult { - info!("Building spend DAG from {spend_addr:?}"); - let mut dag = SpendDag::new(spend_addr); + let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); + + // start crawling from the given spend address + let self_clone = self.clone(); + let crawl_handle = + tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await }); + + // start DAG building from the spends gathered while crawling + // forward spends to processing if provided + let build_handle: tokio::task::JoinHandle> = + tokio::spawn(async move { + let mut dag = SpendDag::new(spend_addr); + while let Some(spend) = rx.recv().await { + let addr = spend.address(); + dag.insert(addr, spend.clone()); + if let Some(sender) = &spend_processing { + sender + .send(spend) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } + } + Ok(dag) + }); + + // wait for both to finish + let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle); + crawl_res.map_err(|e| { + WalletError::SpendProcessing(format!("Failed to Join crawling results {e}")) + })??; + let mut dag = build_res.map_err(|e| { + WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}")) + })??; + + // verify the DAG + if verify { + info!("Now verifying SpendDAG from {spend_addr:?} and recording errors..."); + let start = std::time::Instant::now(); + if let Err(e) = dag.record_faults(&dag.source()) { + let s = format!( + "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}" + ); + error!("{s}"); + return Err(WalletError::Dag(s)); + } + let elapsed = start.elapsed(); + info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}"); + } + + Ok(dag) + } + + /// Crawls the Spend Dag from a given SpendAddress recursively + /// following descendants all the way to UTXOs + /// Returns the UTXOs reached + pub async fn spend_dag_crawl_from( + &self, + spend_addr: SpendAddress, + spend_processing: Sender, + ) -> WalletResult> { + info!("Crawling spend DAG from {spend_addr:?}"); + let mut utxos = BTreeSet::new(); // get first spend let first_spend = match self.crawl_spend(spend_addr).await { InternalGetNetworkSpend::Spend(s) => *s, InternalGetNetworkSpend::DoubleSpend(s1, s2) => { - dag.insert(spend_addr, *s2); + spend_processing + .send(*s2) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; *s1 } InternalGetNetworkSpend::NotFound => { // the cashnote was not spent yet, so it's an UTXO info!("UTXO at {spend_addr:?}"); - return Ok(dag); + utxos.insert(spend_addr); + return Ok(utxos); } InternalGetNetworkSpend::Error(e) => { return Err(WalletError::FailedToGetSpend(e.to_string())); } }; - dag.insert(spend_addr, first_spend.clone()); + spend_processing + .send(first_spend.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; // use iteration instead of recursion to avoid stack overflow let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]); @@ -121,29 +190,28 @@ impl Client { match get_spend { InternalGetNetworkSpend::Spend(spend) => { next_gen_tx.insert(spend.spend.spent_tx.clone()); - if let Some(sender) = &spend_processing { - let _ = sender.send(*spend.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - } - dag.insert(addr, *spend); + spend_processing + .send(*spend.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; } InternalGetNetworkSpend::DoubleSpend(s1, s2) => { info!("Fetched double spend at {addr:?} from network, following both..."); next_gen_tx.insert(s1.spend.spent_tx.clone()); next_gen_tx.insert(s2.spend.spent_tx.clone()); - if let Some(sender) = &spend_processing { - let _ = sender.send(*s1.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - let _ = sender.send(*s2.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - } - dag.insert(addr, *s1); - dag.insert(addr, *s2); + spend_processing + .send(*s1.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + spend_processing + .send(*s2.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } + InternalGetNetworkSpend::NotFound => { + info!("Reached UTXO at {addr:?}"); + utxos.insert(addr); } - InternalGetNetworkSpend::NotFound => info!("Reached UTXO at {addr:?}"), InternalGetNetworkSpend::Error(err) => { error!("Failed to get spend at {addr:?} during DAG collection: {err:?}") } @@ -162,23 +230,8 @@ impl Client { } let elapsed = start.elapsed(); - info!("Finished building SpendDAG from {spend_addr:?} in {elapsed:?}"); - - // verify the DAG - if verify { - info!("Now verifying SpendDAG from {spend_addr:?} and recording errors..."); - let start = std::time::Instant::now(); - if let Err(e) = dag.record_faults(&dag.source()) { - let s = format!( - "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}" - ); - error!("{s}"); - return Err(WalletError::Dag(s)); - } - let elapsed = start.elapsed(); - info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}"); - } - Ok(dag) + info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}"); + Ok(utxos) } /// Extends an existing SpendDag with a new SignedSpend, diff --git a/sn_transfers/src/wallet/error.rs b/sn_transfers/src/wallet/error.rs index 63f28ca6fd..1570f6242b 100644 --- a/sn_transfers/src/wallet/error.rs +++ b/sn_transfers/src/wallet/error.rs @@ -43,6 +43,9 @@ pub enum Error { /// Failed to fetch spend from network #[error("Failed to fetch spend from network: {0}")] FailedToGetSpend(String), + /// Failed to send spend for processing + #[error("Failed to send spend for processing: {0}")] + SpendProcessing(String), /// Failed to parse bytes into a bls key #[error("Unconfirmed transactions still persist even after retries")] UnconfirmedTxAfterRetries, From 7cbad02e0cbdb620f2e4f2ad335d886ab2ee730f Mon Sep 17 00:00:00 2001 From: grumbach Date: Fri, 31 May 2024 19:20:54 -0400 Subject: [PATCH 5/8] feat: make dag collection optional through auditor feature flag --- Cargo.lock | 1 + sn_auditor/Cargo.toml | 1 + sn_auditor/src/dag_db.rs | 90 +++++++++++++++++++++++++++------------- 3 files changed, 63 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ca08f0b21..e59e95aa8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7003,6 +7003,7 @@ dependencies = [ "clap", "color-eyre", "dirs-next", + "futures", "graphviz-rust", "lazy_static", "serde", diff --git a/sn_auditor/Cargo.toml b/sn_auditor/Cargo.toml index 84cb304405..1994bba440 100644 --- a/sn_auditor/Cargo.toml +++ b/sn_auditor/Cargo.toml @@ -26,6 +26,7 @@ bls = { package = "blsttc", version = "8.0.1" } clap = { version = "4.2.1", features = ["derive"] } color-eyre = "~0.6" dirs-next = "~2.0.0" +futures = "0.3.28" graphviz-rust = { version = "0.9.0", optional = true } lazy_static = "1.4.0" serde = { version = "1.0.133", features = ["derive", "rc"] } diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index c179647506..b06523cf39 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -168,6 +168,7 @@ impl SpendDagDb { } /// Dump DAG to disk + #[cfg(feature = "dag-collection")] pub async fn dump(&self) -> Result<()> { std::fs::create_dir_all(&self.path)?; let dag_path = self.path.join(SPEND_DAG_FILENAME); @@ -249,39 +250,70 @@ impl SpendDagDb { continue; } - // get a copy of the current DAG - let mut dag = { self.dag.clone().read().await.clone() }; - - // update it - client - .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) - .await; - - // update utxos - let new_utxos = dag.get_utxos(); - utxo_addresses.extend( - new_utxos - .into_iter() - .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), - ); + #[cfg(not(feature = "dag-collection"))] + { + if let Some(sender) = spend_processing.clone() { + // crawl DAG + let tasks: Vec<_> = addrs_to_get + .iter() + .map(|a| client.spend_dag_crawl_from(*a, sender.clone())) + .collect(); + let res = futures::future::join_all(tasks).await; + let mut new_utxos = BTreeSet::new(); + for (r, a) in res.into_iter().zip(addrs_to_get) { + match r { + Ok(utxos) => new_utxos.extend(utxos), + Err(e) => error!("Failed to crawl DAG from {a:?} : {e}"), + } + } - // write updates to local DAG and save to disk - let mut dag_w_handle = self.dag.write().await; - *dag_w_handle = dag; - std::mem::drop(dag_w_handle); - if let Err(e) = self.dump().await { - error!("Failed to dump DAG: {e}"); + // update utxos + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + ); + } else { + panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); + } } - // update and save svg to file in a background thread so we don't block - #[cfg(feature = "svg-dag")] + #[cfg(feature = "dag-collection")] { - let self_clone = self.clone(); - tokio::spawn(async move { - if let Err(e) = self_clone.dump_dag_svg().await { - error!("Failed to dump DAG svg: {e}"); - } - }); + // get a copy of the current DAG + let mut dag = { self.dag.clone().read().await.clone() }; + + // update it + client + .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) + .await; + + // update utxos + let new_utxos = dag.get_utxos(); + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + ); + + // write updates to local DAG and save to disk + let mut dag_w_handle = self.dag.write().await; + *dag_w_handle = dag; + std::mem::drop(dag_w_handle); + if let Err(e) = self.dump().await { + error!("Failed to dump DAG: {e}"); + } + + // update and save svg to file in a background thread so we don't block + #[cfg(feature = "svg-dag")] + { + let self_clone = self.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.dump_dag_svg().await { + error!("Failed to dump DAG svg: {e}"); + } + }); + } } } } From 3b5d9743f1b16a956388a3216e5a3f83b5fa4761 Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 4 Jun 2024 07:53:11 +0200 Subject: [PATCH 6/8] fix: adapt consts --- sn_auditor/src/dag_db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index b06523cf39..3fa057cbbb 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -271,7 +271,7 @@ impl SpendDagDb { utxo_addresses.extend( new_utxos .into_iter() - .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), ); } else { panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); @@ -293,7 +293,7 @@ impl SpendDagDb { utxo_addresses.extend( new_utxos .into_iter() - .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), ); // write updates to local DAG and save to disk From 07490d315a93f3aabf6022132ab5d83e30078845 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 5 Jun 2024 11:18:04 +0200 Subject: [PATCH 7/8] refactor: improve code readability --- sn_auditor/src/dag_db.rs | 116 +++++++++++++--------------- sn_client/src/audit/dag_crawling.rs | 24 ++++++ 2 files changed, 77 insertions(+), 63 deletions(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index 3fa057cbbb..0ac515d4bc 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -21,6 +21,7 @@ use std::fmt::Write; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; pub const SPEND_DAG_FILENAME: &str = "spend_dag"; @@ -168,7 +169,6 @@ impl SpendDagDb { } /// Dump DAG to disk - #[cfg(feature = "dag-collection")] pub async fn dump(&self) -> Result<()> { std::fs::create_dir_all(&self.path)?; let dag_path = self.path.join(SPEND_DAG_FILENAME); @@ -250,72 +250,62 @@ impl SpendDagDb { continue; } - #[cfg(not(feature = "dag-collection"))] - { - if let Some(sender) = spend_processing.clone() { - // crawl DAG - let tasks: Vec<_> = addrs_to_get - .iter() - .map(|a| client.spend_dag_crawl_from(*a, sender.clone())) - .collect(); - let res = futures::future::join_all(tasks).await; - let mut new_utxos = BTreeSet::new(); - for (r, a) in res.into_iter().zip(addrs_to_get) { - match r { - Ok(utxos) => new_utxos.extend(utxos), - Err(e) => error!("Failed to crawl DAG from {a:?} : {e}"), - } - } - - // update utxos - utxo_addresses.extend( - new_utxos - .into_iter() - .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), - ); - } else { - panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); - } - } - - #[cfg(feature = "dag-collection")] - { - // get a copy of the current DAG - let mut dag = { self.dag.clone().read().await.clone() }; - - // update it - client - .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) - .await; - - // update utxos - let new_utxos = dag.get_utxos(); - utxo_addresses.extend( - new_utxos - .into_iter() - .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), - ); + let new_utxos = if cfg!(feature = "dag-collection") { + self.crawl_and_generate_local_dag( + addrs_to_get, + spend_processing.clone(), + client.clone(), + ) + .await + } else if let Some(sender) = spend_processing.clone() { + client.crawl_to_next_utxos(addrs_to_get, sender).await? + } else { + panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); + }; + + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), + ); + } + } - // write updates to local DAG and save to disk - let mut dag_w_handle = self.dag.write().await; - *dag_w_handle = dag; - std::mem::drop(dag_w_handle); - if let Err(e) = self.dump().await { - error!("Failed to dump DAG: {e}"); - } + async fn crawl_and_generate_local_dag( + &self, + from: BTreeSet, + spend_processing: Option>, + client: Client, + ) -> BTreeSet { + // get a copy of the current DAG + let mut dag = { self.dag.clone().read().await.clone() }; + + // update it + client + .spend_dag_continue_from(&mut dag, from, spend_processing.clone(), true) + .await; + let new_utxos = dag.get_utxos(); + + // write updates to local DAG and save to disk + let mut dag_w_handle = self.dag.write().await; + *dag_w_handle = dag; + std::mem::drop(dag_w_handle); + if let Err(e) = self.dump().await { + error!("Failed to dump DAG: {e}"); + } - // update and save svg to file in a background thread so we don't block - #[cfg(feature = "svg-dag")] - { - let self_clone = self.clone(); - tokio::spawn(async move { - if let Err(e) = self_clone.dump_dag_svg().await { - error!("Failed to dump DAG svg: {e}"); - } - }); + // update and save svg to file in a background thread so we don't block + #[cfg(feature = "svg-dag")] + { + let self_clone = self.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.dump_dag_svg().await { + error!("Failed to dump DAG svg: {e}"); } - } + }); } + + new_utxos } /// Process each spend and update beta rewards data diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index f0d8bbc10d..2e6553f8a9 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -116,6 +116,30 @@ impl Client { Ok(dag) } + /// Crawls the Spend Dag from a set of given SpendAddresses recursively + /// following descendants all the way to UTXOs + /// Returns all the UTXOs reached + pub async fn crawl_to_next_utxos( + &self, + from: BTreeSet, + spend_processing: Sender, + ) -> WalletResult> { + let tasks: Vec<_> = from + .iter() + .map(|a| self.spend_dag_crawl_from(*a, spend_processing.clone())) + .collect(); + let res = futures::future::join_all(tasks).await; + let mut new_utxos = BTreeSet::new(); + for r in res.into_iter() { + match r { + Ok(utxos) => new_utxos.extend(utxos), + Err(e) => return Err(e), + } + } + + Ok(new_utxos) + } + /// Crawls the Spend Dag from a given SpendAddress recursively /// following descendants all the way to UTXOs /// Returns the UTXOs reached From e06d7e1c06a45813b7ccf0c2f7c03147d323aaf9 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 5 Jun 2024 13:44:50 +0200 Subject: [PATCH 8/8] fix: sleep less as some UTXOs are false alerts --- sn_auditor/src/dag_db.rs | 2 +- sn_client/src/audit/dag_crawling.rs | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index 0ac515d4bc..54a1a31108 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -36,7 +36,7 @@ lazy_static! { std::env::var("UTXO_REATTEMPT_INTERVAL") .unwrap_or("3600".to_string()) .parse::() - .unwrap_or(3600) + .unwrap_or(300) ); /// time in seconds to rest between DAG crawls diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 2e6553f8a9..c159576777 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -75,9 +75,15 @@ impl Client { // forward spends to processing if provided let build_handle: tokio::task::JoinHandle> = tokio::spawn(async move { + debug!("Starting building DAG from {spend_addr:?}..."); + let now = std::time::Instant::now(); let mut dag = SpendDag::new(spend_addr); while let Some(spend) = rx.recv().await { let addr = spend.address(); + debug!( + "Inserting spend at {addr:?} size: {}", + dag.all_spends().len() + ); dag.insert(addr, spend.clone()); if let Some(sender) = &spend_processing { sender @@ -86,6 +92,11 @@ impl Client { .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; } } + info!( + "Done gathering DAG of size: {} in {:?}", + dag.all_spends().len(), + now.elapsed() + ); Ok(dag) });