Skip to content

Commit

Permalink
feat: make dag collection optional through auditor feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach committed Jun 4, 2024
1 parent f74d660 commit 98c1234
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_auditor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bls = { package = "blsttc", version = "8.0.1" }
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = "~0.6"
dirs-next = "~2.0.0"
futures = "0.3.28"
graphviz-rust = { version = "0.9.0", optional = true }
lazy_static = "1.4.0"
serde = { version = "1.0.133", features = ["derive", "rc"] }
Expand Down
90 changes: 61 additions & 29 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl SpendDagDb {
}

/// Dump DAG to disk
#[cfg(feature = "dag-collection")]
pub async fn dump(&self) -> Result<()> {
std::fs::create_dir_all(&self.path)?;
let dag_path = self.path.join(SPEND_DAG_FILENAME);
Expand Down Expand Up @@ -241,39 +242,70 @@ impl SpendDagDb {
continue;
}

// get a copy of the current DAG
let mut dag = { self.dag.clone().read().await.clone() };

// update it
client
.spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true)
.await;

// update utxos
let new_utxos = dag.get_utxos();
utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
#[cfg(not(feature = "dag-collection"))]
{
if let Some(sender) = spend_processing.clone() {
// crawl DAG
let tasks: Vec<_> = addrs_to_get
.iter()
.map(|a| client.spend_dag_crawl_from(*a, sender.clone()))
.collect();
let res = futures::future::join_all(tasks).await;
let mut new_utxos = BTreeSet::new();
for (r, a) in res.into_iter().zip(addrs_to_get) {
match r {
Ok(utxos) => new_utxos.extend(utxos),
Err(e) => error!("Failed to crawl DAG from {a:?} : {e}"),
}
}

// write updates to local DAG and save to disk
let mut dag_w_handle = self.dag.write().await;
*dag_w_handle = dag;
std::mem::drop(dag_w_handle);
if let Err(e) = self.dump().await {
error!("Failed to dump DAG: {e}");
// update utxos
utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)),
);
} else {
panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments.");
}
}

// update and save svg to file in a background thread so we don't block
#[cfg(feature = "svg-dag")]
#[cfg(feature = "dag-collection")]
{
let self_clone = self.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.dump_dag_svg().await {
error!("Failed to dump DAG svg: {e}");
}
});
// get a copy of the current DAG
let mut dag = { self.dag.clone().read().await.clone() };

// update it
client
.spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true)
.await;

// update utxos
let new_utxos = dag.get_utxos();
utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)),
);

// write updates to local DAG and save to disk
let mut dag_w_handle = self.dag.write().await;
*dag_w_handle = dag;
std::mem::drop(dag_w_handle);
if let Err(e) = self.dump().await {
error!("Failed to dump DAG: {e}");
}

// update and save svg to file in a background thread so we don't block
#[cfg(feature = "svg-dag")]
{
let self_clone = self.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.dump_dag_svg().await {
error!("Failed to dump DAG svg: {e}");
}
});
}
}
}
}
Expand Down

0 comments on commit 98c1234

Please sign in to comment.