Skip to content

Commit

Permalink
feat: dag crawling through generic crawling
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach committed Jun 4, 2024
1 parent 30e351f commit f74d660
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 41 deletions.
4 changes: 2 additions & 2 deletions sn_cli/src/bin/subcommands/wallet/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) ->
println!("Found a local spend dag on disk, continuing from it...");
if fast_mode {
client
.spend_dag_continue_from_utxos(&mut dag, Default::default(), false)
.spend_dag_continue_from_utxos(&mut dag, None, false)
.await;
}
dag
Expand All @@ -75,7 +75,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) ->
let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
if fast_mode {
client
.spend_dag_build_from(genesis_addr, Default::default(), true)
.spend_dag_build_from(genesis_addr, None, true)
.await?
} else {
client.new_dag_with_genesis_only().await?
Expand Down
131 changes: 92 additions & 39 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use sn_transfers::{
use std::collections::BTreeSet;
use tokio::sync::mpsc::Sender;

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;

enum InternalGetNetworkSpend {
Spend(Box<SignedSpend>),
DoubleSpend(Box<SignedSpend>, Box<SignedSpend>),
Expand Down Expand Up @@ -62,26 +64,93 @@ impl Client {
spend_processing: Option<Sender<SignedSpend>>,
verify: bool,
) -> WalletResult<SpendDag> {
info!("Building spend DAG from {spend_addr:?}");
let mut dag = SpendDag::new(spend_addr);
let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);

// start crawling from the given spend address
let self_clone = self.clone();
let crawl_handle =
tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await });

// start DAG building from the spends gathered while crawling
// forward spends to processing if provided
let build_handle: tokio::task::JoinHandle<Result<SpendDag, WalletError>> =
tokio::spawn(async move {
let mut dag = SpendDag::new(spend_addr);
while let Some(spend) = rx.recv().await {
let addr = spend.address();
dag.insert(addr, spend.clone());
if let Some(sender) = &spend_processing {
sender
.send(spend)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
}
Ok(dag)
});

// wait for both to finish
let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle);
crawl_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join crawling results {e}"))
})??;
let mut dag = build_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}"))
})??;

// verify the DAG
if verify {
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
}

Ok(dag)
}

/// Crawls the Spend Dag from a given SpendAddress recursively
/// following descendants all the way to UTXOs
/// Returns the UTXOs reached
pub async fn spend_dag_crawl_from(
&self,
spend_addr: SpendAddress,
spend_processing: Sender<SignedSpend>,
) -> WalletResult<BTreeSet<SpendAddress>> {
info!("Crawling spend DAG from {spend_addr:?}");
let mut utxos = BTreeSet::new();

// get first spend
let first_spend = match self.crawl_spend(spend_addr).await {
InternalGetNetworkSpend::Spend(s) => *s,
InternalGetNetworkSpend::DoubleSpend(s1, s2) => {
dag.insert(spend_addr, *s2);
spend_processing
.send(*s2)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
*s1
}
InternalGetNetworkSpend::NotFound => {
// the cashnote was not spent yet, so it's an UTXO
info!("UTXO at {spend_addr:?}");
return Ok(dag);
utxos.insert(spend_addr);
return Ok(utxos);
}
InternalGetNetworkSpend::Error(e) => {
return Err(WalletError::FailedToGetSpend(e.to_string()));
}
};
dag.insert(spend_addr, first_spend.clone());
spend_processing
.send(first_spend.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;

// use iteration instead of recursion to avoid stack overflow
let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]);
Expand Down Expand Up @@ -121,29 +190,28 @@ impl Client {
match get_spend {
InternalGetNetworkSpend::Spend(spend) => {
next_gen_tx.insert(spend.spend.spent_tx.clone());
if let Some(sender) = &spend_processing {
let _ = sender.send(*spend.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
}
dag.insert(addr, *spend);
spend_processing
.send(*spend.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
InternalGetNetworkSpend::DoubleSpend(s1, s2) => {
info!("Fetched double spend at {addr:?} from network, following both...");
next_gen_tx.insert(s1.spend.spent_tx.clone());
next_gen_tx.insert(s2.spend.spent_tx.clone());
if let Some(sender) = &spend_processing {
let _ = sender.send(*s1.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
let _ = sender.send(*s2.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
}
dag.insert(addr, *s1);
dag.insert(addr, *s2);
spend_processing
.send(*s1.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
spend_processing
.send(*s2.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
InternalGetNetworkSpend::NotFound => {
info!("Reached UTXO at {addr:?}");
utxos.insert(addr);
}
InternalGetNetworkSpend::NotFound => info!("Reached UTXO at {addr:?}"),
InternalGetNetworkSpend::Error(err) => {
error!("Failed to get spend at {addr:?} during DAG collection: {err:?}")
}
Expand All @@ -162,23 +230,8 @@ impl Client {
}

let elapsed = start.elapsed();
info!("Finished building SpendDAG from {spend_addr:?} in {elapsed:?}");

// verify the DAG
if verify {
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
}
Ok(dag)
info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}");
Ok(utxos)
}

/// Extends an existing SpendDag with a new SignedSpend,
Expand Down
3 changes: 3 additions & 0 deletions sn_transfers/src/wallet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub enum Error {
/// Failed to fetch spend from network
#[error("Failed to fetch spend from network: {0}")]
FailedToGetSpend(String),
/// Failed to send spend for processing
#[error("Failed to send spend for processing: {0}")]
SpendProcessing(String),
/// Failed to parse bytes into a bls key
#[error("Unconfirmed transactions still persist even after retries")]
UnconfirmedTxAfterRetries,
Expand Down

0 comments on commit f74d660

Please sign in to comment.