Skip to content

Commit

Permalink
chore(auditor): cherry-pick from PR 1777 for beta-tracking performanc…
Browse files Browse the repository at this point in the history
…e improve
  • Loading branch information
maqi committed Jun 6, 2024
1 parent e06d7e1 commit 72ec0e3
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 62 deletions.
90 changes: 51 additions & 39 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@ lazy_static! {
.parse::<u64>()
.unwrap_or(300)
);

/// time in seconds to rest between DAG crawls
static ref DAG_CRAWL_REST_INTERVAL: Duration = Duration::from_secs(
std::env::var("DAG_CRAWL_REST_INTERVAL")
.unwrap_or("60".to_string())
.parse::<u64>()
.unwrap_or(60)
);
}

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;
Expand Down Expand Up @@ -222,8 +214,10 @@ impl SpendDagDb {
let spend_processing = if let Some(sk) = self.encryption_sk.clone() {
let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);
tokio::spawn(async move {
while let Some(spend) = rx.recv().await {
self_clone.beta_background_process_spend(spend, &sk).await;
while let Some((spend, utxos_for_further_track)) = rx.recv().await {
self_clone
.beta_background_process_spend(spend, &sk, utxos_for_further_track)
.await;
}
});
Some(tx)
Expand All @@ -232,49 +226,64 @@ impl SpendDagDb {
None
};

let mut addrs_to_get = BTreeSet::new();

loop {
// get current utxos to fetch
let now = Instant::now();
let utxos_to_fetch;
(utxo_addresses, utxos_to_fetch) = utxo_addresses
.into_iter()
.partition(|(_address, time_stamp)| *time_stamp > now);
let addrs_to_get = utxos_to_fetch.keys().cloned().collect::<BTreeSet<_>>();

// Always track new outputs first
if addrs_to_get.is_empty() {
let utxos_to_fetch;
(utxo_addresses, utxos_to_fetch) = utxo_addresses
.into_iter()
.partition(|(_address, time_stamp)| *time_stamp > now);
addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::<BTreeSet<_>>());
}

if addrs_to_get.is_empty() {
debug!(
"Sleeping for {:?} until next re-attempt...",
*DAG_CRAWL_REST_INTERVAL
*UTXO_REATTEMPT_INTERVAL
);
tokio::time::sleep(*DAG_CRAWL_REST_INTERVAL).await;
tokio::time::sleep(*UTXO_REATTEMPT_INTERVAL).await;
continue;
}

let new_utxos = if cfg!(feature = "dag-collection") {
self.crawl_and_generate_local_dag(
addrs_to_get,
spend_processing.clone(),
client.clone(),
)
.await
if cfg!(feature = "dag-collection") {
let new_utxos = self
.crawl_and_generate_local_dag(
addrs_to_get.clone(),
spend_processing.clone(),
client.clone(),
)
.await;
addrs_to_get.clear();
utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
} else if let Some(sender) = spend_processing.clone() {
client.crawl_to_next_utxos(addrs_to_get, sender).await?
let (reattempt_addrs, new_utxos) =
client.crawl_to_next_utxos(&addrs_to_get, sender).await?;
utxo_addresses.extend(
reattempt_addrs
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
addrs_to_get.clear();
addrs_to_get.extend(new_utxos);
} else {
panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments.");
};

utxo_addresses.extend(
new_utxos
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
}
}

async fn crawl_and_generate_local_dag(
&self,
from: BTreeSet<SpendAddress>,
spend_processing: Option<Sender<SignedSpend>>,
spend_processing: Option<Sender<(SignedSpend, u64)>>,
client: Client,
) -> BTreeSet<SpendAddress> {
// get a copy of the current DAG
Expand Down Expand Up @@ -309,13 +318,16 @@ impl SpendDagDb {
}

/// Process each spend and update beta rewards data
pub async fn beta_background_process_spend(&self, spend: SignedSpend, sk: &SecretKey) {
pub async fn beta_background_process_spend(
&self,
spend: SignedSpend,
sk: &SecretKey,
utxos_for_further_track: u64,
) {
let mut beta_tracking = self.beta_tracking.write().await;
beta_tracking.processed_spends += 1;
beta_tracking.total_accumulated_utxo += spend.spend.spent_tx.outputs.len() as u64;
// TODO: currently all royalty and payment_forward output will be tracked
// correct this metrics once this got optimized
beta_tracking.total_on_track_utxo += spend.spend.spent_tx.outputs.len() as u64;
beta_tracking.total_on_track_utxo += utxos_for_further_track;

// check for beta rewards reason
let user_name_hash = match spend.reason().get_sender_hash(sk) {
Expand All @@ -331,15 +343,15 @@ impl SpendDagDb {
let beta_participants_read = self.beta_participants.read().await;

if let Some(user_name) = beta_participants_read.get(&user_name_hash) {
trace!("Got forwarded reward from {user_name} of {amount} at {addr:?}");
trace!("Got forwarded reward {amount} from {user_name} of {amount} at {addr:?}");
beta_tracking
.forwarded_payments
.entry(user_name.to_owned())
.or_default()
.insert((addr, amount));
} else {
warn!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}");
eprintln!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}");
warn!("Found a forwarded reward {amount} for an unknown participant at {addr:?}: {user_name_hash:?}");
eprintln!("Found a forwarded reward {amount} for an unknown participant at {addr:?}: {user_name_hash:?}");
beta_tracking
.forwarded_payments
.entry(format!("unknown participant: {user_name_hash:?}"))
Expand Down
4 changes: 3 additions & 1 deletion sn_auditor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ async fn initialize_background_spend_dag_collection(
.map_err(|e| eyre!("Could not create SpendDag Db: {e}"))?;

// optional force restart from genesis and merge into our current DAG
if force_from_genesis {
// feature guard to prevent a mis-use of opt
if force_from_genesis && cfg!(feature = "dag-collection") {
println!("Forcing DAG to be updated from genesis...");
warn!("Forcing DAG to be updated from genesis...");
let mut d = dag.clone();
let mut genesis_dag = client
.new_dag_with_genesis_only()
Expand Down
107 changes: 85 additions & 22 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{Client, Error, SpendDag};
use futures::{future::join_all, StreamExt};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{
SignedSpend, SpendAddress, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY,
SignedSpend, SpendAddress, SpendReason, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY,
NETWORK_ROYALTIES_PK,
};
use std::collections::BTreeSet;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Client {
pub async fn spend_dag_build_from(
&self,
spend_addr: SpendAddress,
spend_processing: Option<Sender<SignedSpend>>,
spend_processing: Option<Sender<(SignedSpend, u64)>>,
verify: bool,
) -> WalletResult<SpendDag> {
let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);
Expand All @@ -86,8 +87,9 @@ impl Client {
);
dag.insert(addr, spend.clone());
if let Some(sender) = &spend_processing {
let outputs = spend.spend.spent_tx.outputs.len() as u64;
sender
.send(spend)
.send((spend, outputs))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
Expand Down Expand Up @@ -127,28 +129,51 @@ impl Client {
Ok(dag)
}

/// Crawls the Spend Dag from a set of given SpendAddresses recursively
/// following descendants all the way to UTXOs
/// Returns all the UTXOs reached
/// Get spends for a set of given SpendAddresses
/// Notifies the UTXOs that need to be further tracked down.
/// returns: (addresses_for_reattempt, new_utxos_for_furthertracking)
pub async fn crawl_to_next_utxos(
&self,
from: BTreeSet<SpendAddress>,
spend_processing: Sender<SignedSpend>,
) -> WalletResult<BTreeSet<SpendAddress>> {
let tasks: Vec<_> = from
.iter()
.map(|a| self.spend_dag_crawl_from(*a, spend_processing.clone()))
.collect();
let res = futures::future::join_all(tasks).await;
from: &BTreeSet<SpendAddress>,
spend_processing: Sender<(SignedSpend, u64)>,
) -> WalletResult<(BTreeSet<SpendAddress>, BTreeSet<SpendAddress>)> {
let spends = join_all(from.iter().map(|&address| {
let client_clone = self.clone();
async move { (client_clone.crawl_spend(address).await, address) }
}))
.await;

let mut failed_utxos = BTreeSet::new();
let mut new_utxos = BTreeSet::new();
for r in res.into_iter() {
match r {
Ok(utxos) => new_utxos.extend(utxos),
Err(e) => return Err(e),
}

for (result, address) in spends {
let spend = match result {
InternalGetNetworkSpend::Spend(s) => *s,
InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => {
warn!("Detected double spend regarding {address:?}");
continue;
}
InternalGetNetworkSpend::NotFound => {
let _ = failed_utxos.insert(address);
continue;
}
InternalGetNetworkSpend::Error(e) => {
warn!("Got a fetching error {e:?}");
continue;
}
};

let for_further_track = beta_track_analyze_spend(&spend);

spend_processing
.send((spend, for_further_track.len() as u64))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;

new_utxos.extend(for_further_track);
}

Ok(new_utxos)
Ok((failed_utxos, new_utxos))
}

/// Crawls the Spend Dag from a given SpendAddress recursively
Expand Down Expand Up @@ -410,7 +435,7 @@ impl Client {
&self,
dag: &mut SpendDag,
utxos: BTreeSet<SpendAddress>,
spend_processing: Option<Sender<SignedSpend>>,
spend_processing: Option<Sender<(SignedSpend, u64)>>,
verify: bool,
) {
let main_dag_src = dag.source();
Expand Down Expand Up @@ -446,7 +471,7 @@ impl Client {
pub async fn spend_dag_continue_from_utxos(
&self,
dag: &mut SpendDag,
spend_processing: Option<Sender<SignedSpend>>,
spend_processing: Option<Sender<(SignedSpend, u64)>>,
verify: bool,
) {
let utxos = dag.get_utxos();
Expand Down Expand Up @@ -481,3 +506,41 @@ impl Client {
}
}
}

/// Helper function to analyze spend for beta_tracking optimization.
/// returns the new_utxos that needs to be further tracked.
fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<SpendAddress> {
// Filter out royalty outputs
let royalty_pubkeys: BTreeSet<_> = spend
.spend
.network_royalties
.iter()
.map(|derivation_idx| NETWORK_ROYALTIES_PK.new_unique_pubkey(derivation_idx))
.collect();

let new_utxos: BTreeSet<_> = spend
.spend
.spent_tx
.outputs
.iter()
.filter_map(|output| {
if !royalty_pubkeys.contains(&output.unique_pubkey) {
Some(SpendAddress::from_unique_pubkey(&output.unique_pubkey))
} else {
None
}
})
.collect();

if let SpendReason::BetaRewardTracking(_) = spend.reason() {
// Do not track down forwarded payment further
Default::default()
} else {
trace!(
"Spend original has {} outputs, tracking {} of them.",
spend.spend.spent_tx.outputs.len(),
new_utxos.len()
);
new_utxos
}
}

0 comments on commit 72ec0e3

Please sign in to comment.