diff --git a/Cargo.lock b/Cargo.lock index 4bdc0944a6..28edd35876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6988,6 +6988,7 @@ dependencies = [ "clap", "color-eyre", "dirs-next", + "futures", "graphviz-rust", "serde", "serde_json", diff --git a/sn_auditor/Cargo.toml b/sn_auditor/Cargo.toml index 4cce23c440..4e943af532 100644 --- a/sn_auditor/Cargo.toml +++ b/sn_auditor/Cargo.toml @@ -26,6 +26,7 @@ bls = { package = "blsttc", version = "8.0.1" } clap = { version = "4.2.1", features = ["derive"] } color-eyre = "~0.6" dirs-next = "~2.0.0" +futures = "0.3.28" graphviz-rust = { version = "0.9.0", optional = true } serde = { version = "1.0.133", features = ["derive", "rc"] } serde_json = "1.0.108" diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index b16c4f4a2f..c00770dd73 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -150,6 +150,7 @@ impl SpendDagDb { } /// Dump DAG to disk + #[cfg(feature = "dag-collection")] pub async fn dump(&self) -> Result<()> { std::fs::create_dir_all(&self.path)?; let dag_path = self.path.join(SPEND_DAG_FILENAME); @@ -228,39 +229,70 @@ impl SpendDagDb { continue; } - // get a copy of the current DAG - let mut dag = { self.dag.clone().read().await.clone() }; - - // update it - client - .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) - .await; - - // update utxos - let new_utxos = dag.get_utxos(); - utxo_addresses.extend( - new_utxos - .into_iter() - .map(|a| (a, Instant::now() + DAG_RECRAWL_INTERVAL)), - ); + #[cfg(not(feature = "dag-collection"))] + { + if let Some(sender) = spend_processing.clone() { + // crawl DAG + let tasks: Vec<_> = addrs_to_get + .iter() + .map(|a| client.spend_dag_crawl_from(*a, sender.clone())) + .collect(); + let res = futures::future::join_all(tasks).await; + let mut new_utxos = BTreeSet::new(); + for (r, a) in res.into_iter().zip(addrs_to_get) { + match r { + Ok(utxos) => new_utxos.extend(utxos), + Err(e) => error!("Failed to crawl DAG from {a:?} : {e}"), + } + } - // write updates to local DAG and save to disk - let mut dag_w_handle = self.dag.write().await; - *dag_w_handle = dag; - std::mem::drop(dag_w_handle); - if let Err(e) = self.dump().await { - error!("Failed to dump DAG: {e}"); + // update utxos + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + ); + } else { + panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); + } } - // update and save svg to file in a background thread so we don't block - #[cfg(feature = "svg-dag")] + #[cfg(feature = "dag-collection")] { - let self_clone = self.clone(); - tokio::spawn(async move { - if let Err(e) = self_clone.dump_dag_svg().await { - error!("Failed to dump DAG svg: {e}"); - } - }); + // get a copy of the current DAG + let mut dag = { self.dag.clone().read().await.clone() }; + + // update it + client + .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) + .await; + + // update utxos + let new_utxos = dag.get_utxos(); + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + ); + + // write updates to local DAG and save to disk + let mut dag_w_handle = self.dag.write().await; + *dag_w_handle = dag; + std::mem::drop(dag_w_handle); + if let Err(e) = self.dump().await { + error!("Failed to dump DAG: {e}"); + } + + // update and save svg to file in a background thread so we don't block + #[cfg(feature = "svg-dag")] + { + let self_clone = self.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.dump_dag_svg().await { + error!("Failed to dump DAG svg: {e}"); + } + }); + } } } }