diff --git a/sn_cli/src/bin/subcommands/wallet/audit.rs b/sn_cli/src/bin/subcommands/wallet/audit.rs index 29423fd8c6..c0e3833d50 100644 --- a/sn_cli/src/bin/subcommands/wallet/audit.rs +++ b/sn_cli/src/bin/subcommands/wallet/audit.rs @@ -64,7 +64,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> println!("Found a local spend dag on disk, continuing from it..."); if fast_mode { client - .spend_dag_continue_from_utxos(&mut dag, Default::default(), false) + .spend_dag_continue_from_utxos(&mut dag, None, false) .await; } dag @@ -75,7 +75,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); if fast_mode { client - .spend_dag_build_from(genesis_addr, Default::default(), true) + .spend_dag_build_from(genesis_addr, None, true) .await? } else { client.new_dag_with_genesis_only().await? diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 324cb18565..f0d8bbc10d 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -16,6 +16,8 @@ use sn_transfers::{ use std::collections::BTreeSet; use tokio::sync::mpsc::Sender; +const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; + enum InternalGetNetworkSpend { Spend(Box), DoubleSpend(Box, Box), @@ -62,26 +64,93 @@ impl Client { spend_processing: Option>, verify: bool, ) -> WalletResult { - info!("Building spend DAG from {spend_addr:?}"); - let mut dag = SpendDag::new(spend_addr); + let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); + + // start crawling from the given spend address + let self_clone = self.clone(); + let crawl_handle = + tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await }); + + // start DAG building from the spends gathered while crawling + // forward spends to processing if provided + let build_handle: tokio::task::JoinHandle> = + tokio::spawn(async move { + let mut dag = SpendDag::new(spend_addr); + while let Some(spend) = rx.recv().await { + let addr = spend.address(); + dag.insert(addr, spend.clone()); + if let Some(sender) = &spend_processing { + sender + .send(spend) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } + } + Ok(dag) + }); + + // wait for both to finish + let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle); + crawl_res.map_err(|e| { + WalletError::SpendProcessing(format!("Failed to Join crawling results {e}")) + })??; + let mut dag = build_res.map_err(|e| { + WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}")) + })??; + + // verify the DAG + if verify { + info!("Now verifying SpendDAG from {spend_addr:?} and recording errors..."); + let start = std::time::Instant::now(); + if let Err(e) = dag.record_faults(&dag.source()) { + let s = format!( + "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}" + ); + error!("{s}"); + return Err(WalletError::Dag(s)); + } + let elapsed = start.elapsed(); + info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}"); + } + + Ok(dag) + } + + /// Crawls the Spend Dag from a given SpendAddress recursively + /// following descendants all the way to UTXOs + /// Returns the UTXOs reached + pub async fn spend_dag_crawl_from( + &self, + spend_addr: SpendAddress, + spend_processing: Sender, + ) -> WalletResult> { + info!("Crawling spend DAG from {spend_addr:?}"); + let mut utxos = BTreeSet::new(); // get first spend let first_spend = match self.crawl_spend(spend_addr).await { InternalGetNetworkSpend::Spend(s) => *s, InternalGetNetworkSpend::DoubleSpend(s1, s2) => { - dag.insert(spend_addr, *s2); + spend_processing + .send(*s2) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; *s1 } InternalGetNetworkSpend::NotFound => { // the cashnote was not spent yet, so it's an UTXO info!("UTXO at {spend_addr:?}"); - return Ok(dag); + utxos.insert(spend_addr); + return Ok(utxos); } InternalGetNetworkSpend::Error(e) => { return Err(WalletError::FailedToGetSpend(e.to_string())); } }; - dag.insert(spend_addr, first_spend.clone()); + spend_processing + .send(first_spend.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; // use iteration instead of recursion to avoid stack overflow let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]); @@ -121,29 +190,28 @@ impl Client { match get_spend { InternalGetNetworkSpend::Spend(spend) => { next_gen_tx.insert(spend.spend.spent_tx.clone()); - if let Some(sender) = &spend_processing { - let _ = sender.send(*spend.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - } - dag.insert(addr, *spend); + spend_processing + .send(*spend.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; } InternalGetNetworkSpend::DoubleSpend(s1, s2) => { info!("Fetched double spend at {addr:?} from network, following both..."); next_gen_tx.insert(s1.spend.spent_tx.clone()); next_gen_tx.insert(s2.spend.spent_tx.clone()); - if let Some(sender) = &spend_processing { - let _ = sender.send(*s1.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - let _ = sender.send(*s2.clone()).await.map_err(|e| { - error!("Failed to send spend {addr:?} to processing: {e}") - }); - } - dag.insert(addr, *s1); - dag.insert(addr, *s2); + spend_processing + .send(*s1.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + spend_processing + .send(*s2.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } + InternalGetNetworkSpend::NotFound => { + info!("Reached UTXO at {addr:?}"); + utxos.insert(addr); } - InternalGetNetworkSpend::NotFound => info!("Reached UTXO at {addr:?}"), InternalGetNetworkSpend::Error(err) => { error!("Failed to get spend at {addr:?} during DAG collection: {err:?}") } @@ -162,23 +230,8 @@ impl Client { } let elapsed = start.elapsed(); - info!("Finished building SpendDAG from {spend_addr:?} in {elapsed:?}"); - - // verify the DAG - if verify { - info!("Now verifying SpendDAG from {spend_addr:?} and recording errors..."); - let start = std::time::Instant::now(); - if let Err(e) = dag.record_faults(&dag.source()) { - let s = format!( - "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}" - ); - error!("{s}"); - return Err(WalletError::Dag(s)); - } - let elapsed = start.elapsed(); - info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}"); - } - Ok(dag) + info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}"); + Ok(utxos) } /// Extends an existing SpendDag with a new SignedSpend, diff --git a/sn_transfers/src/wallet/error.rs b/sn_transfers/src/wallet/error.rs index 63f28ca6fd..1570f6242b 100644 --- a/sn_transfers/src/wallet/error.rs +++ b/sn_transfers/src/wallet/error.rs @@ -43,6 +43,9 @@ pub enum Error { /// Failed to fetch spend from network #[error("Failed to fetch spend from network: {0}")] FailedToGetSpend(String), + /// Failed to send spend for processing + #[error("Failed to send spend for processing: {0}")] + SpendProcessing(String), /// Failed to parse bytes into a bls key #[error("Unconfirmed transactions still persist even after retries")] UnconfirmedTxAfterRetries,