Skip to content

Commit

Permalink
fix(discovery): insert newly seen candidates and return random candid…
Browse files Browse the repository at this point in the history
…ates
  • Loading branch information
RolandSherwin committed Nov 26, 2023
1 parent aec3ede commit cddcb7f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 76 deletions.
169 changes: 96 additions & 73 deletions sn_networking/src/network_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{kad::KBucketKey, PeerId};
use rand::{thread_rng, Rng};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
collections::{hash_map::Entry, HashMap, HashSet},
time::Instant,
};

Expand All @@ -26,32 +27,15 @@ const MAX_PEERS_PER_BUCKET: usize = 5;
#[derive(Debug, Clone)]
pub(crate) struct NetworkDiscovery {
self_key: KBucketKey<PeerId>,
candidates: HashMap<u32, VecDeque<NetworkAddress>>,
candidates: HashMap<u32, Vec<NetworkAddress>>,
}

impl NetworkDiscovery {
/// Create a new instance of NetworkDiscovery and tries to populate each bucket with random peers.
pub(crate) fn new(self_peer_id: &PeerId) -> Self {
let start = Instant::now();
let self_key = KBucketKey::from(*self_peer_id);
let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);

let mut candidates: HashMap<u32, VecDeque<NetworkAddress>> = HashMap::new();
for (ilog2, candidate) in candidates_vec {
match candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
continue;
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
}
let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);

info!(
"Time to generate NetworkDiscoveryCandidates: {:?}",
Expand All @@ -70,82 +54,121 @@ impl NetworkDiscovery {
}
}

/// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the
/// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate
/// is inserted at the last
/// Tries to refresh our current candidate list. We replace the old ones with new if we find any.
pub(crate) fn try_refresh_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidate) in candidates_vec {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(candidate);
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
for (ilog2, candidates) in candidates_vec {
self.insert_candidates(ilog2, candidates);
}
}

/// Returns one candidate per bucket
/// Todo: Limit the candidates to return. Favor the closest buckets.
pub(crate) fn candidates(&self) -> impl Iterator<Item = &NetworkAddress> {
self.candidates
.values()
.filter_map(|candidates| candidates.front())
}

/// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full.
/// The result from the kad::GetClosestPeers are again used to update our kbucket.
pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet<PeerId>) {
let now = Instant::now();
for peer in closest_peers {
let peer = NetworkAddress::from_peer(peer);
let peer_key = peer.as_kbucket_key();
if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() {
match self.candidates.entry(ilog2_distance) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
// extra check to make sure we don't insert the same peer again
if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(peer);
} else {
entry.push_back(peer);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([peer]));
}
}
}

let candidates_map: HashMap<u32, Vec<NetworkAddress>> = closest_peers
.into_iter()
.filter_map(|peer| {
let peer = NetworkAddress::from_peer(peer);
let peer_key = peer.as_kbucket_key();
peer_key
.distance(&self.self_key)
.ilog2()
.map(|ilog2| (ilog2, peer))
})
// To collect the NetworkAddresses into a vector.
.fold(HashMap::new(), |mut acc, (ilog2, peer)| {
acc.entry(ilog2).or_default().push(peer);
acc
});

for (ilog2, candidates) in candidates_map {
self.insert_candidates(ilog2, candidates);
}

trace!(
"It took {:?} to NetworkDiscovery::handle get closest query",
now.elapsed()
);
}

/// Returns one random candidate per bucket
/// Todo: Limit the candidates to return. Favor the closest buckets.
pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> {
let mut rng = thread_rng();
let mut op = Vec::with_capacity(self.candidates.len());

let candidates = self.candidates.values().filter_map(|candidates| {
// get a random index each time
let random_index = rng.gen::<usize>() % candidates.len();
candidates.get(random_index)
});
op.extend(candidates);
op
}

// Insert the new candidates and remove the old ones to maintain MAX_PEERS_PER_BUCKET.
fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec<NetworkAddress>) {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let existing_candidates = entry.get_mut();
// insert only newly seen new_candidates
let new_candidates: Vec<_> = new_candidates
.into_iter()
.filter(|candidate| !existing_candidates.contains(candidate))
.collect();
existing_candidates.extend(new_candidates);
// Keep only the last MAX_PEERS_PER_BUCKET elements i.e., the newest ones
let excess = existing_candidates
.len()
.saturating_sub(MAX_PEERS_PER_BUCKET);
if excess > 0 {
existing_candidates.drain(..excess);
}
}
Entry::Vacant(entry) => {
entry.insert(new_candidates);
}
}
}

/// Uses rayon to parallelize the generation
fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
) -> Vec<(u32, NetworkAddress)> {
) -> HashMap<u32, Vec<NetworkAddress>> {
(0..num_to_generate)
.into_par_iter()
.filter_map(|_| {
let candidate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2_distance = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2_distance, candidate))
let ilog2 = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2, candidate))
})
.collect::<Vec<_>>()
// Since it is parallel iterator, the fold fn batches the items and will produce multiple outputs. So we
// should use reduce fn to combine multiple outputs.
.fold(
HashMap::new,
|mut acc: HashMap<u32, Vec<NetworkAddress>>, (ilog2, candidate)| {
acc.entry(ilog2).or_default().push(candidate);
acc
},
)
.reduce(
HashMap::new,
|mut acc: HashMap<u32, Vec<NetworkAddress>>, map| {
for (ilog2, candidates) in map {
let entry = acc.entry(ilog2).or_default();
for candidate in candidates {
if entry.len() < MAX_PEERS_PER_BUCKET {
entry.push(candidate);
} else {
break;
}
}
}
acc
},
)
}
}
4 changes: 1 addition & 3 deletions sn_node/tests/verify_routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ async fn verify_routing_table() -> Result<()> {
})
.collect::<HashMap<_, _>>();

let current_peer = *all_peers
.get(node_index as usize - 1)
.unwrap_or_else(|| panic!("Node should be present at index {}", node_index - 1));
let current_peer = all_peers[node_index as usize - 1];
let current_peer_key = KBucketKey::from(current_peer);

let mut failed_list = Vec::new();
Expand Down

0 comments on commit cddcb7f

Please sign in to comment.