From 30fbd3714004c9468298b728a2a65abbc2965f94 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 27 Nov 2023 00:06:59 +0530 Subject: [PATCH] fix(discovery): return a random candidate each time --- sn_networking/src/network_discovery.rs | 91 ++++++++++++-------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 1ee3bc62d6..4af06d3ea2 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -7,10 +7,11 @@ // permissions and limitations relating to use of the SAFE Network Software. use libp2p::{kad::KBucketKey, PeerId}; +use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet}, time::Instant, }; @@ -26,7 +27,7 @@ const MAX_PEERS_PER_BUCKET: usize = 5; #[derive(Debug, Clone)] pub(crate) struct NetworkDiscovery { self_key: KBucketKey, - candidates: HashMap>, + candidates: HashMap>, } impl NetworkDiscovery { @@ -36,7 +37,7 @@ impl NetworkDiscovery { let self_key = KBucketKey::from(*self_peer_id); let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); - let mut candidates: HashMap> = HashMap::new(); + let mut candidates: HashMap> = HashMap::new(); for (ilog2, candidate) in candidates_vec { match candidates.entry(ilog2) { Entry::Occupied(mut entry) => { @@ -44,11 +45,11 @@ impl NetworkDiscovery { if entry.len() >= MAX_PEERS_PER_BUCKET { continue; } else { - entry.push_back(candidate); + entry.push(candidate); } } Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); + let _ = entry.insert(Vec::from([candidate])); } } } @@ -70,61 +71,37 @@ impl NetworkDiscovery { } } - /// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the - /// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate - /// is inserted at the last + /// Tries to refresh our current candidate list. If the list is full, we pop the last candidate and use the new one. pub(crate) fn try_refresh_candidates(&mut self) { let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); for (ilog2, candidate) in candidates_vec { - match self.candidates.entry(ilog2) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - if entry.len() >= MAX_PEERS_PER_BUCKET { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(candidate); - } else { - entry.push_back(candidate); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); - } - } + self.insert_candidate(ilog2, candidate); } } - /// Returns one candidate per bucket + /// Returns one random candidate per bucket /// Todo: Limit the candidates to return. Favor the closest buckets. - pub(crate) fn candidates(&self) -> impl Iterator { - self.candidates - .values() - .filter_map(|candidates| candidates.front()) + pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> { + let mut rng = thread_rng(); + let mut op = Vec::with_capacity(self.candidates.len()); + + let candidates = self.candidates.values().filter_map(|candidates| { + // get a random index each time + let random_index = rng.gen::() % candidates.len(); + candidates.get(random_index) + }); + op.extend(candidates); + op } - /// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full. + /// The result from the kad::GetClosestPeers are again used to update our kbucket. pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { let now = Instant::now(); for peer in closest_peers { let peer = NetworkAddress::from_peer(peer); let peer_key = peer.as_kbucket_key(); - if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() { - match self.candidates.entry(ilog2_distance) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - // extra check to make sure we don't insert the same peer again - if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(peer); - } else { - entry.push_back(peer); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([peer])); - } - } + if let Some(ilog2) = peer_key.distance(&self.self_key).ilog2() { + self.insert_candidate(ilog2, peer); } } trace!( @@ -133,6 +110,24 @@ impl NetworkDiscovery { ); } + fn insert_candidate(&mut self, ilog2: u32, candidate: NetworkAddress) { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + // extra check to make sure we don't insert the same peer again + if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&candidate) { + let _ = entry.pop(); + entry.push(candidate); + } else { + entry.push(candidate); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(Vec::from([candidate])); + } + } + } + /// Uses rayon to parallelize the generation fn generate_candidates( self_key: &KBucketKey, @@ -143,8 +138,8 @@ impl NetworkDiscovery { .filter_map(|_| { let candidate = NetworkAddress::from_peer(PeerId::random()); let candidate_key = candidate.as_kbucket_key(); - let ilog2_distance = candidate_key.distance(&self_key).ilog2()?; - Some((ilog2_distance, candidate)) + let ilog2 = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2, candidate)) }) .collect::>() }