Skip to content

Commit

Permalink
Merge pull request #2081 from maqi/avoid_UTXOs_slow_down_width_first_…
Browse files Browse the repository at this point in the history
…tracking

Avoid utxos slow down width first tracking
  • Loading branch information
maqi authored Sep 3, 2024
2 parents 8e11156 + a5bcf40 commit 37b8bb6
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 56 deletions.
75 changes: 40 additions & 35 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ pub const BETA_PARTICIPANTS_FILENAME: &str = "beta_participants.txt";

lazy_static! {
/// time in seconds UTXOs are refetched in DAG crawl
static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs(
std::env::var("UTXO_REATTEMPT_INTERVAL")
static ref UTXO_REATTEMPT_SECONDS: u64 = std::env::var("UTXO_REATTEMPT_INTERVAL")
.unwrap_or("7200".to_string())
.parse::<u64>()
.unwrap_or(7200)
);
.unwrap_or(7200);

/// time in seconds UTXOs are refetched in DAG crawl
static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs(*UTXO_REATTEMPT_SECONDS);

/// time in seconds to rest between DAG crawls
static ref DAG_CRAWL_REST_INTERVAL: Duration = Duration::from_secs(
Expand All @@ -62,7 +63,7 @@ pub struct SpendDagDb {
dag: Arc<RwLock<SpendDag>>,
beta_tracking: Arc<RwLock<BetaTracking>>,
beta_participants: Arc<RwLock<BTreeMap<Hash, String>>>,
utxo_addresses: Arc<RwLock<BTreeMap<SpendAddress, (Instant, NanoTokens)>>>,
utxo_addresses: Arc<RwLock<BTreeMap<SpendAddress, UtxoStatus>>>,
encryption_sk: Option<SecretKey>,
}

Expand All @@ -78,9 +79,11 @@ struct BetaTracking {
/// Map of Discord usernames to their tracked forwarded payments
type ForwardedPayments = BTreeMap<String, BTreeSet<(SpendAddress, NanoTokens)>>;

type UtxoStatus = (
BTreeMap<SpendAddress, (Instant, NanoTokens)>,
BTreeMap<SpendAddress, (Instant, NanoTokens)>,
type UtxoStatus = (u64, Instant, NanoTokens);

type PartitionedUtxoStatus = (
BTreeMap<SpendAddress, UtxoStatus>,
BTreeMap<SpendAddress, UtxoStatus>,
);

#[derive(Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -230,7 +233,9 @@ impl SpendDagDb {
{
let mut utxo_addresses = self.utxo_addresses.write().await;
for addr in start_dag.get_utxos().iter() {
let _ = utxo_addresses.insert(*addr, (Instant::now(), NanoTokens::zero()));
info!("Tracking genesis UTXO {addr:?}");
// The UTXO holding 30% will never be used, hence be counted as 0
let _ = utxo_addresses.insert(*addr, (0, Instant::now(), NanoTokens::zero()));
}
}

Expand Down Expand Up @@ -297,25 +302,21 @@ impl SpendDagDb {
None
};

let mut addrs_to_get = BTreeSet::new();
let mut addrs_to_get = BTreeMap::new();

loop {
// get expired utxos for re-attempt fetch
{
let now = Instant::now();
let mut utxo_addresses = self.utxo_addresses.write().await;
let mut utxos_to_fetch = BTreeSet::new();
utxo_addresses.retain(|address, (time_stamp, amount)| {
let not_expired = *time_stamp > now;
if !not_expired {
let utxo_addresses = self.utxo_addresses.read().await;
for (address, (failure_times, time_stamp, amount)) in utxo_addresses.iter() {
if now > *time_stamp {
if amount.as_nano() > 100000 {
info!("re-attempt fetching big-UTXO {address:?} with {amount}");
}
let _ = utxos_to_fetch.insert((*address, *amount));
let _ = addrs_to_get.insert(*address, (*failure_times, *amount));
}
not_expired
});
addrs_to_get.extend(utxos_to_fetch);
}
}

if addrs_to_get.is_empty() {
Expand All @@ -330,7 +331,7 @@ impl SpendDagDb {
if cfg!(feature = "dag-collection") {
let new_utxos = self
.crawl_and_generate_local_dag(
addrs_to_get.iter().map(|(addr, _amount)| *addr).collect(),
addrs_to_get.keys().copied().collect(),
spend_processing.clone(),
client.clone(),
)
Expand All @@ -342,22 +343,23 @@ impl SpendDagDb {
(
a,
(
0,
Instant::now() + *UTXO_REATTEMPT_INTERVAL,
NanoTokens::zero(),
),
)
}));
} else if let Some(sender) = spend_processing.clone() {
let reattempt_addrs = client
.crawl_to_next_utxos(
&mut addrs_to_get,
sender.clone(),
*UTXO_REATTEMPT_INTERVAL,
)
let (reattempt_addrs, fetched_addrs) = client
.crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS)
.await;

let mut utxo_addresses = self.utxo_addresses.write().await;
utxo_addresses.extend(reattempt_addrs);
for addr in fetched_addrs.iter() {
let _ = utxo_addresses.remove(addr);
}
for (addr, tuple) in reattempt_addrs {
let _ = utxo_addresses.insert(addr, tuple);
}
} else {
panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments.");
};
Expand Down Expand Up @@ -612,29 +614,32 @@ impl SpendDagDb {

// UTXO amount that greater than 100000 nanos shall be considered as `change`
// which indicates the `wallet balance`
let (big_utxos, small_utxos): UtxoStatus = utxo_addresses
.iter()
.partition(|(_address, (_time_stamp, amount))| amount.as_nano() > 100000);
let (big_utxos, small_utxos): PartitionedUtxoStatus =
utxo_addresses
.iter()
.partition(|(_address, (_failure_times, _time_stamp, amount))| {
amount.as_nano() > 100000
});

let total_big_utxo_amount = big_utxos
.iter()
.map(|(_addr, (_time, amount))| amount.as_nano())
.map(|(_addr, (_failure_times, _time, amount))| amount.as_nano())
.sum::<u64>();
tracking_performance =
format!("{tracking_performance}\ntotal_big_utxo_amount: {total_big_utxo_amount}");

let total_small_utxo_amount = small_utxos
.iter()
.map(|(_addr, (_time, amount))| amount.as_nano())
.map(|(_addr, (_failure_times, _time, amount))| amount.as_nano())
.sum::<u64>();
tracking_performance =
format!("{tracking_performance}\ntotal_small_utxo_amount: {total_small_utxo_amount}");

for (addr, (_time, amount)) in big_utxos.iter() {
for (addr, (_failure_times, _time, amount)) in big_utxos.iter() {
tracking_performance =
format!("{tracking_performance}\n{addr:?}, {}", amount.as_nano());
}
for (addr, (_time, amount)) in small_utxos.iter() {
for (addr, (_failure_times, _time, amount)) in small_utxos.iter() {
tracking_performance =
format!("{tracking_performance}\n{addr:?}, {}", amount.as_nano());
}
Expand Down
88 changes: 67 additions & 21 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ impl Client {
dag.insert(genesis_addr, spend);
}
Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => {
println!("Double spend detected at Genesis: {genesis_addr:?}");
for spend in spends.into_iter() {
println!("Burnt spend detected at Genesis: {genesis_addr:?}");
warn!("Burnt spend detected at Genesis: {genesis_addr:?}");
for (i, spend) in spends.into_iter().enumerate() {
let reason = spend.reason();
let amount = spend.spend.amount();
let ancestors_len = spend.spend.ancestors.len();
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
warn!(
"burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants
);
dag.insert(genesis_addr, spend);
}
}
Expand Down Expand Up @@ -139,24 +149,34 @@ impl Client {
/// 2, addrs_to_get to hold the addresses for further track
pub async fn crawl_to_next_utxos(
&self,
addrs_to_get: &mut BTreeSet<(SpendAddress, NanoTokens)>,
addrs_to_get: &mut BTreeMap<SpendAddress, (u64, NanoTokens)>,
sender: Sender<(SignedSpend, u64, bool)>,
reattempt_interval: Duration,
) -> BTreeMap<SpendAddress, (Instant, NanoTokens)> {
reattempt_seconds: u64,
) -> (
BTreeMap<SpendAddress, (u64, Instant, NanoTokens)>,
Vec<SpendAddress>,
) {
let mut failed_utxos = BTreeMap::new();
let mut tasks = JoinSet::new();
let mut addrs_for_further_track = BTreeSet::new();
let mut fetched_addrs = Vec::new();

while !addrs_to_get.is_empty() || !tasks.is_empty() {
while tasks.len() < 32 && !addrs_to_get.is_empty() {
if let Some((addr, amount)) = addrs_to_get.pop_first() {
if let Some((addr, (failed_times, amount))) = addrs_to_get.pop_first() {
let client_clone = self.clone();
let _ = tasks
.spawn(async move { (client_clone.crawl_spend(addr).await, addr, amount) });
let _ = tasks.spawn(async move {
(
client_clone.crawl_spend(addr).await,
failed_times,
addr,
amount,
)
});
}
}

if let Some(Ok((result, address, amount))) = tasks.join_next().await {
if let Some(Ok((result, failed_times, address, amount))) = tasks.join_next().await {
match result {
InternalGetNetworkSpend::Spend(spend) => {
let for_further_track = beta_track_analyze_spend(&spend);
Expand All @@ -165,10 +185,11 @@ impl Client {
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()));
addrs_for_further_track.extend(for_further_track);
fetched_addrs.push(address);
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
warn!(
"Detected double spend regarding {address:?} - {:?}",
"Detected burnt spend regarding {address:?} - {:?}",
spends.len()
);
for (i, spend) in spends.iter().enumerate() {
Expand All @@ -178,7 +199,7 @@ impl Client {
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
warn!(
"double spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
"burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants
);

Expand All @@ -190,27 +211,45 @@ impl Client {
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()));
}
fetched_addrs.push(address);
}
InternalGetNetworkSpend::NotFound => {
if amount.as_nano() > 100000 {
let reattempt_interval = if amount.as_nano() > 100000 {
info!("Not find spend of big-UTXO {address:?} with {amount}");
}
let _ = failed_utxos
.insert(address, (Instant::now() + reattempt_interval, amount));
reattempt_seconds
} else {
reattempt_seconds * (failed_times * 8 + 1)
};
let _ = failed_utxos.insert(
address,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_interval),
amount,
),
);
}
InternalGetNetworkSpend::Error(e) => {
warn!("Fetching spend {address:?} with {amount:?} result in error {e:?}");
// Error of `NotEnoughCopies` could be re-attempted and succeed eventually.
let _ = failed_utxos
.insert(address, (Instant::now() + reattempt_interval, amount));
let _ = failed_utxos.insert(
address,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_seconds),
amount,
),
);
}
}
}
}

addrs_to_get.extend(addrs_for_further_track);
for (addr, amount) in addrs_for_further_track {
let _ = addrs_to_get.entry(addr).or_insert((0, amount));
}

failed_utxos
(failed_utxos, fetched_addrs)
}

/// Crawls the Spend Dag from a given SpendAddress recursively
Expand Down Expand Up @@ -551,6 +590,7 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano
.map(|(_, _, der)| DEFAULT_NETWORK_ROYALTIES_PK.new_unique_pubkey(der))
.collect();

let spend_addr = spend.address();
let new_utxos: BTreeSet<_> = spend
.spend
.descendants
Expand All @@ -561,7 +601,13 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano
{
None
} else {
Some((SpendAddress::from_unique_pubkey(unique_pubkey), *amount))
let addr = SpendAddress::from_unique_pubkey(unique_pubkey);

if amount.as_nano() > 100000 {
info!("Spend {spend_addr:?} has a big-UTXO {addr:?} with {amount}");
}

Some((addr, *amount))
}
})
.collect();
Expand All @@ -571,7 +617,7 @@ fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, Nano
Default::default()
} else {
trace!(
"Spend original has {} outputs, tracking {} of them.",
"Spend {spend_addr:?} original has {} outputs, tracking {} of them.",
spend.spend.descendants.len(),
new_utxos.len()
);
Expand Down

0 comments on commit 37b8bb6

Please sign in to comment.