Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef authored Jun 6, 2024
2 parents 28d077b + e06d7e1 commit c90178c
Show file tree
Hide file tree
Showing 31 changed files with 858 additions and 191 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion node-launchpad/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn initialize_panic_handler() -> Result<()> {
Ok(())
}

// todo: use sn_logging
pub fn initialize_logging() -> Result<()> {
let timestamp = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string();
let log_path = get_launchpad_data_dir_path()?.join("logs");
Expand All @@ -84,7 +85,7 @@ pub fn initialize_logging() -> Result<()> {
std::env::set_var(
"RUST_LOG",
std::env::var("RUST_LOG")
.unwrap_or_else(|_| format!("{}=trace,debug", env!("CARGO_CRATE_NAME"))),
.unwrap_or_else(|_| format!("{}=trace,sn_node_manager=trace,sn_service_management=trace,sn_peers_acquisition=trace", env!("CARGO_CRATE_NAME"))),
);
let file_subscriber = tracing_subscriber::fmt::layer()
.with_file(true)
Expand Down
1 change: 1 addition & 0 deletions sn_auditor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bls = { package = "blsttc", version = "8.0.1" }
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = "~0.6"
dirs-next = "~2.0.0"
futures = "0.3.28"
graphviz-rust = { version = "0.9.0", optional = true }
lazy_static = "1.4.0"
serde = { version = "1.0.133", features = ["derive", "rc"] }
Expand Down
76 changes: 49 additions & 27 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fmt::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

pub const SPEND_DAG_FILENAME: &str = "spend_dag";
Expand All @@ -35,7 +36,7 @@ lazy_static! {
std::env::var("UTXO_REATTEMPT_INTERVAL")
.unwrap_or("3600".to_string())
.parse::<u64>()
.unwrap_or(3600)
.unwrap_or(300)
);

/// time in seconds to rest between DAG crawls
Expand Down Expand Up @@ -249,41 +250,62 @@ impl SpendDagDb {
continue;
}

// get a copy of the current DAG
let mut dag = { self.dag.clone().read().await.clone() };
let new_utxos = if cfg!(feature = "dag-collection") {
self.crawl_and_generate_local_dag(
addrs_to_get,
spend_processing.clone(),
client.clone(),
)
.await
} else if let Some(sender) = spend_processing.clone() {
client.crawl_to_next_utxos(addrs_to_get, sender).await?
} else {
panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments.");
};

// update it
client
.spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true)
.await;

// update utxos
let new_utxos = dag.get_utxos();
utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
}
}

// write updates to local DAG and save to disk
let mut dag_w_handle = self.dag.write().await;
*dag_w_handle = dag;
std::mem::drop(dag_w_handle);
if let Err(e) = self.dump().await {
error!("Failed to dump DAG: {e}");
}
async fn crawl_and_generate_local_dag(
&self,
from: BTreeSet<SpendAddress>,
spend_processing: Option<Sender<SignedSpend>>,
client: Client,
) -> BTreeSet<SpendAddress> {
// get a copy of the current DAG
let mut dag = { self.dag.clone().read().await.clone() };

// update it
client
.spend_dag_continue_from(&mut dag, from, spend_processing.clone(), true)
.await;
let new_utxos = dag.get_utxos();

// write updates to local DAG and save to disk
let mut dag_w_handle = self.dag.write().await;
*dag_w_handle = dag;
std::mem::drop(dag_w_handle);
if let Err(e) = self.dump().await {
error!("Failed to dump DAG: {e}");
}

// update and save svg to file in a background thread so we don't block
#[cfg(feature = "svg-dag")]
{
let self_clone = self.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.dump_dag_svg().await {
error!("Failed to dump DAG svg: {e}");
}
});
}
// update and save svg to file in a background thread so we don't block
#[cfg(feature = "svg-dag")]
{
let self_clone = self.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.dump_dag_svg().await {
error!("Failed to dump DAG svg: {e}");
}
});
}

new_utxos
}

/// Process each spend and update beta rewards data
Expand Down
4 changes: 2 additions & 2 deletions sn_cli/src/bin/subcommands/wallet/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) ->
println!("Found a local spend dag on disk, continuing from it...");
if fast_mode {
client
.spend_dag_continue_from_utxos(&mut dag, Default::default(), false)
.spend_dag_continue_from_utxos(&mut dag, None, false)
.await;
}
dag
Expand All @@ -75,7 +75,7 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) ->
let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
if fast_mode {
client
.spend_dag_build_from(genesis_addr, Default::default(), true)
.spend_dag_build_from(genesis_addr, None, true)
.await?
} else {
client.new_dag_with_genesis_only().await?
Expand Down
166 changes: 127 additions & 39 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use sn_transfers::{
use std::collections::BTreeSet;
use tokio::sync::mpsc::Sender;

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;

enum InternalGetNetworkSpend {
Spend(Box<SignedSpend>),
DoubleSpend(Box<SignedSpend>, Box<SignedSpend>),
Expand Down Expand Up @@ -62,26 +64,128 @@ impl Client {
spend_processing: Option<Sender<SignedSpend>>,
verify: bool,
) -> WalletResult<SpendDag> {
info!("Building spend DAG from {spend_addr:?}");
let mut dag = SpendDag::new(spend_addr);
let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);

// start crawling from the given spend address
let self_clone = self.clone();
let crawl_handle =
tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await });

// start DAG building from the spends gathered while crawling
// forward spends to processing if provided
let build_handle: tokio::task::JoinHandle<Result<SpendDag, WalletError>> =
tokio::spawn(async move {
debug!("Starting building DAG from {spend_addr:?}...");
let now = std::time::Instant::now();
let mut dag = SpendDag::new(spend_addr);
while let Some(spend) = rx.recv().await {
let addr = spend.address();
debug!(
"Inserting spend at {addr:?} size: {}",
dag.all_spends().len()
);
dag.insert(addr, spend.clone());
if let Some(sender) = &spend_processing {
sender
.send(spend)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
}
info!(
"Done gathering DAG of size: {} in {:?}",
dag.all_spends().len(),
now.elapsed()
);
Ok(dag)
});

// wait for both to finish
let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle);
crawl_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join crawling results {e}"))
})??;
let mut dag = build_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}"))
})??;

// verify the DAG
if verify {
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
}

Ok(dag)
}

/// Crawls the Spend Dag from a set of given SpendAddresses recursively
/// following descendants all the way to UTXOs
/// Returns all the UTXOs reached
pub async fn crawl_to_next_utxos(
&self,
from: BTreeSet<SpendAddress>,
spend_processing: Sender<SignedSpend>,
) -> WalletResult<BTreeSet<SpendAddress>> {
let tasks: Vec<_> = from
.iter()
.map(|a| self.spend_dag_crawl_from(*a, spend_processing.clone()))
.collect();
let res = futures::future::join_all(tasks).await;
let mut new_utxos = BTreeSet::new();
for r in res.into_iter() {
match r {
Ok(utxos) => new_utxos.extend(utxos),
Err(e) => return Err(e),
}
}

Ok(new_utxos)
}

/// Crawls the Spend Dag from a given SpendAddress recursively
/// following descendants all the way to UTXOs
/// Returns the UTXOs reached
pub async fn spend_dag_crawl_from(
&self,
spend_addr: SpendAddress,
spend_processing: Sender<SignedSpend>,
) -> WalletResult<BTreeSet<SpendAddress>> {
info!("Crawling spend DAG from {spend_addr:?}");
let mut utxos = BTreeSet::new();

// get first spend
let first_spend = match self.crawl_spend(spend_addr).await {
InternalGetNetworkSpend::Spend(s) => *s,
InternalGetNetworkSpend::DoubleSpend(s1, s2) => {
dag.insert(spend_addr, *s2);
spend_processing
.send(*s2)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
*s1
}
InternalGetNetworkSpend::NotFound => {
// the cashnote was not spent yet, so it's an UTXO
info!("UTXO at {spend_addr:?}");
return Ok(dag);
utxos.insert(spend_addr);
return Ok(utxos);
}
InternalGetNetworkSpend::Error(e) => {
return Err(WalletError::FailedToGetSpend(e.to_string()));
}
};
dag.insert(spend_addr, first_spend.clone());
spend_processing
.send(first_spend.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;

// use iteration instead of recursion to avoid stack overflow
let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]);
Expand Down Expand Up @@ -121,29 +225,28 @@ impl Client {
match get_spend {
InternalGetNetworkSpend::Spend(spend) => {
next_gen_tx.insert(spend.spend.spent_tx.clone());
if let Some(sender) = &spend_processing {
let _ = sender.send(*spend.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
}
dag.insert(addr, *spend);
spend_processing
.send(*spend.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
InternalGetNetworkSpend::DoubleSpend(s1, s2) => {
info!("Fetched double spend at {addr:?} from network, following both...");
next_gen_tx.insert(s1.spend.spent_tx.clone());
next_gen_tx.insert(s2.spend.spent_tx.clone());
if let Some(sender) = &spend_processing {
let _ = sender.send(*s1.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
let _ = sender.send(*s2.clone()).await.map_err(|e| {
error!("Failed to send spend {addr:?} to processing: {e}")
});
}
dag.insert(addr, *s1);
dag.insert(addr, *s2);
spend_processing
.send(*s1.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
spend_processing
.send(*s2.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
InternalGetNetworkSpend::NotFound => {
info!("Reached UTXO at {addr:?}");
utxos.insert(addr);
}
InternalGetNetworkSpend::NotFound => info!("Reached UTXO at {addr:?}"),
InternalGetNetworkSpend::Error(err) => {
error!("Failed to get spend at {addr:?} during DAG collection: {err:?}")
}
Expand All @@ -162,23 +265,8 @@ impl Client {
}

let elapsed = start.elapsed();
info!("Finished building SpendDAG from {spend_addr:?} in {elapsed:?}");

// verify the DAG
if verify {
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
}
Ok(dag)
info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}");
Ok(utxos)
}

/// Extends an existing SpendDag with a new SignedSpend,
Expand Down
Loading

0 comments on commit c90178c

Please sign in to comment.