From 6f4744e8d6ed9fd5c8e6cbccfca9a2b4405552ae Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 30 Aug 2024 04:38:08 +0800 Subject: [PATCH 1/2] chore(auditor): more logging on big-UTXO --- sn_auditor/src/dag_db.rs | 1 + sn_client/src/audit/dag_crawling.rs | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index ff9d7bd3ee..e2ecb6db48 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -230,6 +230,7 @@ impl SpendDagDb { { let mut utxo_addresses = self.utxo_addresses.write().await; for addr in start_dag.get_utxos().iter() { + info!("Tracking genesis UTXO {addr:?}"); let _ = utxo_addresses.insert(*addr, (Instant::now(), NanoTokens::zero())); } } diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 12e0188630..b46afe01ef 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -39,7 +39,11 @@ impl Client { } Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => { println!("Double spend detected at Genesis: {genesis_addr:?}"); - for spend in spends.into_iter() { + warn!("Double spend detected at Genesis: {genesis_addr:?}"); + for (i, spend) in spends.into_iter().enumerate() { + warn!("double spend entry {i} reason {:?}, amount {}, inputs: {}, outputs: {}, royties: {}, {:?} - {:?}", + spend.spend.reason, spend.spend.amount, spend.spend.spent_tx.inputs.len(), spend.spend.spent_tx.outputs.len(), + spend.spend.network_royalties.len(), spend.spend.spent_tx.inputs, spend.spend.spent_tx.outputs); dag.insert(genesis_addr, spend); } } @@ -551,6 +555,7 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano .map(|(_, _, der)| DEFAULT_NETWORK_ROYALTIES_PK.new_unique_pubkey(der)) .collect(); + let spend_addr = spend.address(); let new_utxos: BTreeSet<_> = spend .spend .descendants @@ -561,7 +566,13 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano { None } else { - Some((SpendAddress::from_unique_pubkey(unique_pubkey), *amount)) + let addr = SpendAddress::from_unique_pubkey(unique_pubkey); + + if *amount > 100000 { + info!("Spend {spend_addr:?} has a big-UTXO {addr:?} with {amount}"); + } + + Some((addr, *amount)) } }) .collect(); @@ -571,7 +582,7 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano Default::default() } else { trace!( - "Spend original has {} outputs, tracking {} of them.", + "Spend {spend_addr:?} original has {} outputs, tracking {} of them.", spend.spend.descendants.len(), new_utxos.len() ); From a5bcf4070ae07eaf2394d590f17cb026981a4aba Mon Sep 17 00:00:00 2001 From: qima Date: Sat, 31 Aug 2024 06:06:00 +0800 Subject: [PATCH 2/2] chore(auditor): wait stepped longer for trival UTXOs --- sn_auditor/src/dag_db.rs | 74 +++++++++++++------------- sn_client/src/audit/dag_crawling.rs | 81 +++++++++++++++++++++-------- 2 files changed, 97 insertions(+), 58 deletions(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index e2ecb6db48..a1bb786010 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -35,12 +35,13 @@ pub const BETA_PARTICIPANTS_FILENAME: &str = "beta_participants.txt"; lazy_static! { /// time in seconds UTXOs are refetched in DAG crawl - static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs( - std::env::var("UTXO_REATTEMPT_INTERVAL") + static ref UTXO_REATTEMPT_SECONDS: u64 = std::env::var("UTXO_REATTEMPT_INTERVAL") .unwrap_or("7200".to_string()) .parse::() - .unwrap_or(7200) - ); + .unwrap_or(7200); + + /// time in seconds UTXOs are refetched in DAG crawl + static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs(*UTXO_REATTEMPT_SECONDS); /// time in seconds to rest between DAG crawls static ref DAG_CRAWL_REST_INTERVAL: Duration = Duration::from_secs( @@ -62,7 +63,7 @@ pub struct SpendDagDb { dag: Arc>, beta_tracking: Arc>, beta_participants: Arc>>, - utxo_addresses: Arc>>, + utxo_addresses: Arc>>, encryption_sk: Option, } @@ -78,9 +79,11 @@ struct BetaTracking { /// Map of Discord usernames to their tracked forwarded payments type ForwardedPayments = BTreeMap>; -type UtxoStatus = ( - BTreeMap, - BTreeMap, +type UtxoStatus = (u64, Instant, NanoTokens); + +type PartitionedUtxoStatus = ( + BTreeMap, + BTreeMap, ); #[derive(Clone, Serialize, Deserialize)] @@ -231,7 +234,8 @@ impl SpendDagDb { let mut utxo_addresses = self.utxo_addresses.write().await; for addr in start_dag.get_utxos().iter() { info!("Tracking genesis UTXO {addr:?}"); - let _ = utxo_addresses.insert(*addr, (Instant::now(), NanoTokens::zero())); + // The UTXO holding 30% will never be used, hence be counted as 0 + let _ = utxo_addresses.insert(*addr, (0, Instant::now(), NanoTokens::zero())); } } @@ -298,25 +302,21 @@ impl SpendDagDb { None }; - let mut addrs_to_get = BTreeSet::new(); + let mut addrs_to_get = BTreeMap::new(); loop { // get expired utxos for re-attempt fetch { let now = Instant::now(); - let mut utxo_addresses = self.utxo_addresses.write().await; - let mut utxos_to_fetch = BTreeSet::new(); - utxo_addresses.retain(|address, (time_stamp, amount)| { - let not_expired = *time_stamp > now; - if !not_expired { + let utxo_addresses = self.utxo_addresses.read().await; + for (address, (failure_times, time_stamp, amount)) in utxo_addresses.iter() { + if now > *time_stamp { if amount.as_nano() > 100000 { info!("re-attempt fetching big-UTXO {address:?} with {amount}"); } - let _ = utxos_to_fetch.insert((*address, *amount)); + let _ = addrs_to_get.insert(*address, (*failure_times, *amount)); } - not_expired - }); - addrs_to_get.extend(utxos_to_fetch); + } } if addrs_to_get.is_empty() { @@ -331,7 +331,7 @@ impl SpendDagDb { if cfg!(feature = "dag-collection") { let new_utxos = self .crawl_and_generate_local_dag( - addrs_to_get.iter().map(|(addr, _amount)| *addr).collect(), + addrs_to_get.keys().copied().collect(), spend_processing.clone(), client.clone(), ) @@ -343,22 +343,23 @@ impl SpendDagDb { ( a, ( + 0, Instant::now() + *UTXO_REATTEMPT_INTERVAL, NanoTokens::zero(), ), ) })); } else if let Some(sender) = spend_processing.clone() { - let reattempt_addrs = client - .crawl_to_next_utxos( - &mut addrs_to_get, - sender.clone(), - *UTXO_REATTEMPT_INTERVAL, - ) + let (reattempt_addrs, fetched_addrs) = client + .crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS) .await; - let mut utxo_addresses = self.utxo_addresses.write().await; - utxo_addresses.extend(reattempt_addrs); + for addr in fetched_addrs.iter() { + let _ = utxo_addresses.remove(addr); + } + for (addr, tuple) in reattempt_addrs { + let _ = utxo_addresses.insert(addr, tuple); + } } else { panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); }; @@ -613,29 +614,32 @@ impl SpendDagDb { // UTXO amount that greater than 100000 nanos shall be considered as `change` // which indicates the `wallet balance` - let (big_utxos, small_utxos): UtxoStatus = utxo_addresses - .iter() - .partition(|(_address, (_time_stamp, amount))| amount.as_nano() > 100000); + let (big_utxos, small_utxos): PartitionedUtxoStatus = + utxo_addresses + .iter() + .partition(|(_address, (_failure_times, _time_stamp, amount))| { + amount.as_nano() > 100000 + }); let total_big_utxo_amount = big_utxos .iter() - .map(|(_addr, (_time, amount))| amount.as_nano()) + .map(|(_addr, (_failure_times, _time, amount))| amount.as_nano()) .sum::(); tracking_performance = format!("{tracking_performance}\ntotal_big_utxo_amount: {total_big_utxo_amount}"); let total_small_utxo_amount = small_utxos .iter() - .map(|(_addr, (_time, amount))| amount.as_nano()) + .map(|(_addr, (_failure_times, _time, amount))| amount.as_nano()) .sum::(); tracking_performance = format!("{tracking_performance}\ntotal_small_utxo_amount: {total_small_utxo_amount}"); - for (addr, (_time, amount)) in big_utxos.iter() { + for (addr, (_failure_times, _time, amount)) in big_utxos.iter() { tracking_performance = format!("{tracking_performance}\n{addr:?}, {}", amount.as_nano()); } - for (addr, (_time, amount)) in small_utxos.iter() { + for (addr, (_failure_times, _time, amount)) in small_utxos.iter() { tracking_performance = format!("{tracking_performance}\n{addr:?}, {}", amount.as_nano()); } diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index b46afe01ef..e29760858e 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -38,12 +38,18 @@ impl Client { dag.insert(genesis_addr, spend); } Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => { - println!("Double spend detected at Genesis: {genesis_addr:?}"); - warn!("Double spend detected at Genesis: {genesis_addr:?}"); + println!("Burnt spend detected at Genesis: {genesis_addr:?}"); + warn!("Burnt spend detected at Genesis: {genesis_addr:?}"); for (i, spend) in spends.into_iter().enumerate() { - warn!("double spend entry {i} reason {:?}, amount {}, inputs: {}, outputs: {}, royties: {}, {:?} - {:?}", - spend.spend.reason, spend.spend.amount, spend.spend.spent_tx.inputs.len(), spend.spend.spent_tx.outputs.len(), - spend.spend.network_royalties.len(), spend.spend.spent_tx.inputs, spend.spend.spent_tx.outputs); + let reason = spend.reason(); + let amount = spend.spend.amount(); + let ancestors_len = spend.spend.ancestors.len(); + let descendants_len = spend.spend.descendants.len(); + let roy_len = spend.spend.network_royalties().len(); + warn!( + "burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", + spend.spend.ancestors, spend.spend.descendants + ); dag.insert(genesis_addr, spend); } } @@ -143,24 +149,34 @@ impl Client { /// 2, addrs_to_get to hold the addresses for further track pub async fn crawl_to_next_utxos( &self, - addrs_to_get: &mut BTreeSet<(SpendAddress, NanoTokens)>, + addrs_to_get: &mut BTreeMap, sender: Sender<(SignedSpend, u64, bool)>, - reattempt_interval: Duration, - ) -> BTreeMap { + reattempt_seconds: u64, + ) -> ( + BTreeMap, + Vec, + ) { let mut failed_utxos = BTreeMap::new(); let mut tasks = JoinSet::new(); let mut addrs_for_further_track = BTreeSet::new(); + let mut fetched_addrs = Vec::new(); while !addrs_to_get.is_empty() || !tasks.is_empty() { while tasks.len() < 32 && !addrs_to_get.is_empty() { - if let Some((addr, amount)) = addrs_to_get.pop_first() { + if let Some((addr, (failed_times, amount))) = addrs_to_get.pop_first() { let client_clone = self.clone(); - let _ = tasks - .spawn(async move { (client_clone.crawl_spend(addr).await, addr, amount) }); + let _ = tasks.spawn(async move { + ( + client_clone.crawl_spend(addr).await, + failed_times, + addr, + amount, + ) + }); } } - if let Some(Ok((result, address, amount))) = tasks.join_next().await { + if let Some(Ok((result, failed_times, address, amount))) = tasks.join_next().await { match result { InternalGetNetworkSpend::Spend(spend) => { let for_further_track = beta_track_analyze_spend(&spend); @@ -169,10 +185,11 @@ impl Client { .await .map_err(|e| WalletError::SpendProcessing(e.to_string())); addrs_for_further_track.extend(for_further_track); + fetched_addrs.push(address); } InternalGetNetworkSpend::DoubleSpend(spends) => { warn!( - "Detected double spend regarding {address:?} - {:?}", + "Detected burnt spend regarding {address:?} - {:?}", spends.len() ); for (i, spend) in spends.iter().enumerate() { @@ -182,7 +199,7 @@ impl Client { let descendants_len = spend.spend.descendants.len(); let roy_len = spend.spend.network_royalties().len(); warn!( - "double spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", + "burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", spend.spend.ancestors, spend.spend.descendants ); @@ -194,27 +211,45 @@ impl Client { .await .map_err(|e| WalletError::SpendProcessing(e.to_string())); } + fetched_addrs.push(address); } InternalGetNetworkSpend::NotFound => { - if amount.as_nano() > 100000 { + let reattempt_interval = if amount.as_nano() > 100000 { info!("Not find spend of big-UTXO {address:?} with {amount}"); - } - let _ = failed_utxos - .insert(address, (Instant::now() + reattempt_interval, amount)); + reattempt_seconds + } else { + reattempt_seconds * (failed_times * 8 + 1) + }; + let _ = failed_utxos.insert( + address, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_interval), + amount, + ), + ); } InternalGetNetworkSpend::Error(e) => { warn!("Fetching spend {address:?} with {amount:?} result in error {e:?}"); // Error of `NotEnoughCopies` could be re-attempted and succeed eventually. - let _ = failed_utxos - .insert(address, (Instant::now() + reattempt_interval, amount)); + let _ = failed_utxos.insert( + address, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_seconds), + amount, + ), + ); } } } } - addrs_to_get.extend(addrs_for_further_track); + for (addr, amount) in addrs_for_further_track { + let _ = addrs_to_get.entry(addr).or_insert((0, amount)); + } - failed_utxos + (failed_utxos, fetched_addrs) } /// Crawls the Spend Dag from a given SpendAddress recursively @@ -568,7 +603,7 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano } else { let addr = SpendAddress::from_unique_pubkey(unique_pubkey); - if *amount > 100000 { + if amount.as_nano() > 100000 { info!("Spend {spend_addr:?} has a big-UTXO {addr:?} with {amount}"); }