Skip to content

Commit

Permalink
fix(discovery): return a random candidate each time
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Nov 26, 2023
1 parent aec3ede commit 30fbd37
Showing 1 changed file with 43 additions and 48 deletions.
91 changes: 43 additions & 48 deletions sn_networking/src/network_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{kad::KBucketKey, PeerId};
use rand::{thread_rng, Rng};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
collections::{hash_map::Entry, HashMap, HashSet},
time::Instant,
};

Expand All @@ -26,7 +27,7 @@ const MAX_PEERS_PER_BUCKET: usize = 5;
#[derive(Debug, Clone)]
pub(crate) struct NetworkDiscovery {
self_key: KBucketKey<PeerId>,
candidates: HashMap<u32, VecDeque<NetworkAddress>>,
candidates: HashMap<u32, Vec<NetworkAddress>>,
}

impl NetworkDiscovery {
Expand All @@ -36,19 +37,19 @@ impl NetworkDiscovery {
let self_key = KBucketKey::from(*self_peer_id);
let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);

let mut candidates: HashMap<u32, VecDeque<NetworkAddress>> = HashMap::new();
let mut candidates: HashMap<u32, Vec<NetworkAddress>> = HashMap::new();
for (ilog2, candidate) in candidates_vec {
match candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
continue;
} else {
entry.push_back(candidate);
entry.push(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
let _ = entry.insert(Vec::from([candidate]));
}
}
}
Expand All @@ -70,61 +71,37 @@ impl NetworkDiscovery {
}
}

/// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the
/// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate
/// is inserted at the last
/// Tries to refresh our current candidate list. If the list is full, we pop the last candidate and use the new one.
pub(crate) fn try_refresh_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidate) in candidates_vec {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(candidate);
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
self.insert_candidate(ilog2, candidate);
}
}

/// Returns one candidate per bucket
/// Returns one random candidate per bucket
/// Todo: Limit the candidates to return. Favor the closest buckets.
pub(crate) fn candidates(&self) -> impl Iterator<Item = &NetworkAddress> {
self.candidates
.values()
.filter_map(|candidates| candidates.front())
pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> {
let mut rng = thread_rng();
let mut op = Vec::with_capacity(self.candidates.len());

let candidates = self.candidates.values().filter_map(|candidates| {
// get a random index each time
let random_index = rng.gen::<usize>() % candidates.len();
candidates.get(random_index)
});
op.extend(candidates);
op
}

/// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full.
/// The result from the kad::GetClosestPeers are again used to update our kbucket.
pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet<PeerId>) {
let now = Instant::now();
for peer in closest_peers {
let peer = NetworkAddress::from_peer(peer);
let peer_key = peer.as_kbucket_key();
if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() {
match self.candidates.entry(ilog2_distance) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
// extra check to make sure we don't insert the same peer again
if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(peer);
} else {
entry.push_back(peer);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([peer]));
}
}
if let Some(ilog2) = peer_key.distance(&self.self_key).ilog2() {
self.insert_candidate(ilog2, peer);
}
}
trace!(
Expand All @@ -133,6 +110,24 @@ impl NetworkDiscovery {
);
}

fn insert_candidate(&mut self, ilog2: u32, candidate: NetworkAddress) {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
// extra check to make sure we don't insert the same peer again
if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&candidate) {
let _ = entry.pop();
entry.push(candidate);
} else {
entry.push(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(Vec::from([candidate]));
}
}
}

/// Uses rayon to parallelize the generation
fn generate_candidates(
self_key: &KBucketKey<PeerId>,
Expand All @@ -143,8 +138,8 @@ impl NetworkDiscovery {
.filter_map(|_| {
let candidate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2_distance = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2_distance, candidate))
let ilog2 = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2, candidate))
})
.collect::<Vec<_>>()
}
Expand Down

0 comments on commit 30fbd37

Please sign in to comment.