diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 1ee3bc62d6..f2dea4a80d 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -7,10 +7,11 @@ // permissions and limitations relating to use of the SAFE Network Software. use libp2p::{kad::KBucketKey, PeerId}; +use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet}, time::Instant, }; @@ -26,7 +27,7 @@ const MAX_PEERS_PER_BUCKET: usize = 5; #[derive(Debug, Clone)] pub(crate) struct NetworkDiscovery { self_key: KBucketKey, - candidates: HashMap>, + candidates: HashMap>, } impl NetworkDiscovery { @@ -34,24 +35,7 @@ impl NetworkDiscovery { pub(crate) fn new(self_peer_id: &PeerId) -> Self { let start = Instant::now(); let self_key = KBucketKey::from(*self_peer_id); - let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); - - let mut candidates: HashMap> = HashMap::new(); - for (ilog2, candidate) in candidates_vec { - match candidates.entry(ilog2) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - if entry.len() >= MAX_PEERS_PER_BUCKET { - continue; - } else { - entry.push_back(candidate); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); - } - } - } + let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); info!( "Time to generate NetworkDiscoveryCandidates: {:?}", @@ -70,82 +54,121 @@ impl NetworkDiscovery { } } - /// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the - /// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate - /// is inserted at the last + /// Tries to refresh our current candidate list. We replace the old ones with new if we find any. pub(crate) fn try_refresh_candidates(&mut self) { let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); - for (ilog2, candidate) in candidates_vec { - match self.candidates.entry(ilog2) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - if entry.len() >= MAX_PEERS_PER_BUCKET { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(candidate); - } else { - entry.push_back(candidate); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); - } - } + for (ilog2, candidates) in candidates_vec { + self.insert_candidates(ilog2, candidates); } } - /// Returns one candidate per bucket - /// Todo: Limit the candidates to return. Favor the closest buckets. - pub(crate) fn candidates(&self) -> impl Iterator { - self.candidates - .values() - .filter_map(|candidates| candidates.front()) - } - - /// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full. + /// The result from the kad::GetClosestPeers are again used to update our kbucket. pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { let now = Instant::now(); - for peer in closest_peers { - let peer = NetworkAddress::from_peer(peer); - let peer_key = peer.as_kbucket_key(); - if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() { - match self.candidates.entry(ilog2_distance) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - // extra check to make sure we don't insert the same peer again - if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(peer); - } else { - entry.push_back(peer); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([peer])); - } - } - } + + let candidates_map: HashMap> = closest_peers + .into_iter() + .filter_map(|peer| { + let peer = NetworkAddress::from_peer(peer); + let peer_key = peer.as_kbucket_key(); + peer_key + .distance(&self.self_key) + .ilog2() + .map(|ilog2| (ilog2, peer)) + }) + // To collect the NetworkAddresses into a vector. + .fold(HashMap::new(), |mut acc, (ilog2, peer)| { + acc.entry(ilog2).or_default().push(peer); + acc + }); + + for (ilog2, candidates) in candidates_map { + self.insert_candidates(ilog2, candidates); } + trace!( "It took {:?} to NetworkDiscovery::handle get closest query", now.elapsed() ); } + /// Returns one random candidate per bucket + /// Todo: Limit the candidates to return. Favor the closest buckets. + pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> { + let mut rng = thread_rng(); + let mut op = Vec::with_capacity(self.candidates.len()); + + let candidates = self.candidates.values().filter_map(|candidates| { + // get a random index each time + let random_index = rng.gen::() % candidates.len(); + candidates.get(random_index) + }); + op.extend(candidates); + op + } + + // Insert the new candidates and remove the old ones to maintain MAX_PEERS_PER_BUCKET. + fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec) { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let existing_candidates = entry.get_mut(); + // insert only newly seen new_candidates + let new_candidates: Vec<_> = new_candidates + .into_iter() + .filter(|candidate| !existing_candidates.contains(candidate)) + .collect(); + existing_candidates.extend(new_candidates); + // Keep only the last MAX_PEERS_PER_BUCKET elements i.e., the newest ones + let excess = existing_candidates + .len() + .saturating_sub(MAX_PEERS_PER_BUCKET); + if excess > 0 { + existing_candidates.drain(..excess); + } + } + Entry::Vacant(entry) => { + entry.insert(new_candidates); + } + } + } + /// Uses rayon to parallelize the generation fn generate_candidates( self_key: &KBucketKey, num_to_generate: usize, - ) -> Vec<(u32, NetworkAddress)> { + ) -> HashMap> { (0..num_to_generate) .into_par_iter() .filter_map(|_| { let candidate = NetworkAddress::from_peer(PeerId::random()); let candidate_key = candidate.as_kbucket_key(); - let ilog2_distance = candidate_key.distance(&self_key).ilog2()?; - Some((ilog2_distance, candidate)) + let ilog2 = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2, candidate)) }) - .collect::>() + // Since it is parallel iterator, the fold fn batches the items and will produce multiple outputs. So we + // should use reduce fn to combine multiple outputs. + .fold( + HashMap::new, + |mut acc: HashMap>, (ilog2, candidate)| { + acc.entry(ilog2).or_default().push(candidate); + acc + }, + ) + .reduce( + HashMap::new, + |mut acc: HashMap>, map| { + for (ilog2, candidates) in map { + let entry = acc.entry(ilog2).or_default(); + for candidate in candidates { + if entry.len() < MAX_PEERS_PER_BUCKET { + entry.push(candidate); + } else { + break; + } + } + } + acc + }, + ) } } diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index a1eba12046..bb5f83d33d 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -70,9 +70,7 @@ async fn verify_routing_table() -> Result<()> { }) .collect::>(); - let current_peer = *all_peers - .get(node_index as usize - 1) - .unwrap_or_else(|| panic!("Node should be present at index {}", node_index - 1)); + let current_peer = all_peers[node_index as usize - 1]; let current_peer_key = KBucketKey::from(current_peer); let mut failed_list = Vec::new();