Skip to content

Commit

Permalink
feat(discovery): try to use random candidates from a bucket when avai…
Browse files Browse the repository at this point in the history
…lable
  • Loading branch information
RolandSherwin committed Nov 24, 2023
1 parent 12b75c2 commit 7c50c35
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ custom_debug = "~0.5.0"
libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] }
prometheus-client = { version = "0.22", optional = true }
rand = { version = "~0.8.5", features = ["small_rng"] }
rayon = "1.8.0"
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = [ "derive", "rc" ]}
sn_protocol = { path = "../sn_protocol", version = "0.8.29" }
Expand Down
6 changes: 5 additions & 1 deletion sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ impl SwarmDriver {
}

pub(crate) fn trigger_network_discovery(&mut self) {
let now = Instant::now();
// The query is just to trigger the network discovery,
// hence no need to wait for a result.
for addr in &self.network_discovery_candidates {
for addr in self.network_discovery_candidates.candidates() {
let _ = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_peers(addr.as_bytes());
}
self.network_discovery_candidates
.try_generate_new_candidates();
self.bootstrap.initiated();
debug!("Trigger network discovery took {:?}", now.elapsed());
}
}

Expand Down
53 changes: 4 additions & 49 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
event::NetworkEvent,
event::{GetRecordResultMap, NodeEvent},
multiaddr_pop_p2p,
network_discovery::NetworkDiscoveryCandidates,
record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig},
record_store_api::UnifiedRecordStore,
replication_fetcher::ReplicationFetcher,
Expand Down Expand Up @@ -50,7 +51,7 @@ use sn_protocol::{
NetworkAddress, PrettyPrintKBucketKey,
};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
Expand Down Expand Up @@ -495,7 +496,7 @@ impl NetworkBuilder {
// `identify` protocol to kick in and get them in the routing table.
dialed_peers: CircularVec::new(63),
is_gossip_handler: false,
network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id),
network_discovery_candidates: NetworkDiscoveryCandidates::new(&peer_id),
};

Ok((
Expand All @@ -511,51 +512,6 @@ impl NetworkBuilder {
}
}

fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec<NetworkAddress> {
let mut candidates: BTreeMap<usize, NetworkAddress> = BTreeMap::new();
// To avoid deadlock or taking too much time, currently set a fixed generation attempts
let mut attempts = 0;
// Also an early return when got the first 20 furthest kBuckets covered.
let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect();

let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key();
let local_key_bytes_len = local_key.hashed_bytes().len();
while attempts < 10000 && !buckets_covered.is_empty() {
let candiate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candiate.as_kbucket_key();
let candidate_key_bytes_len = candidate_key.hashed_bytes().len();

if local_key_bytes_len != candidate_key_bytes_len {
panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}");
}

let common_leading_bits =
common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes());

let _ = candidates.insert(common_leading_bits, candiate);
let _ = buckets_covered.remove(&common_leading_bits);

attempts += 1;
}

let generated_buckets: Vec<_> = candidates.keys().copied().collect();
let generated_candidates: Vec<_> = candidates.values().cloned().collect();
trace!("Generated targets covering kbuckets {generated_buckets:?}");
generated_candidates
}

/// Returns the length of the common leading bits.
/// e.g. when `11110000` and `11111111`, return as 4.
/// Note: the length of two shall be the same
fn common_leading_bits(one: &[u8], two: &[u8]) -> usize {
for byte_index in 0..one.len() {
if one[byte_index] != two[byte_index] {
return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize;
}
}
8 * one.len()
}

pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
Expand Down Expand Up @@ -584,9 +540,8 @@ pub struct SwarmDriver {
// they are not supposed to process the gossip msg that received from libp2p.
pub(crate) is_gossip_handler: bool,
// A list of random `PeerId` candidates that falls into kbuckets,
// one for each furthest 30 kbuckets.
// This is to ensure a more accurate network discovery.
pub(crate) network_discovery_candidates: Vec<NetworkAddress>,
pub(crate) network_discovery_candidates: NetworkDiscoveryCandidates,
}

impl SwarmDriver {
Expand Down
1 change: 1 addition & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod event;
mod metrics;
#[cfg(feature = "open-metrics")]
mod metrics_service;
mod network_discovery;
mod quorum;
mod record_store;
mod record_store_api;
Expand Down
108 changes: 108 additions & 0 deletions sn_networking/src/network_discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2023 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{kad::KBucketKey, PeerId};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
time::Instant,
};

const INITIAL_GENERATION_ATTEMPTS: usize = 10_000;
const GENERATION_ATTEMPTS: usize = 1_000;
const MAX_PEERS_PER_BUCKET: usize = 5;

#[derive(Debug, Clone)]
pub(crate) struct NetworkDiscoveryCandidates {
self_key: KBucketKey<PeerId>,
candidates: HashMap<u32, VecDeque<NetworkAddress>>,
}

impl NetworkDiscoveryCandidates {
pub(crate) fn new(self_peer_id: &PeerId) -> Self {
let start = Instant::now();
let self_key = KBucketKey::from(*self_peer_id);
let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);

let mut candidates: HashMap<u32, VecDeque<NetworkAddress>> = HashMap::new();
for (ilog2, candidate) in candidates_vec {
match candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
continue;
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
}

info!(
"Time to generate NetworkDiscoveryCandidates: {:?}",
start.elapsed()
);
let mut buckets_covered = candidates
.iter()
.map(|(ilog2, candidates)| (*ilog2, candidates.len()))
.collect::<Vec<_>>();
buckets_covered.sort_by_key(|(ilog2, _)| *ilog2);
info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}");

Self {
self_key,
candidates,
}
}

pub(crate) fn try_generate_new_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidate) in candidates_vec {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(candidate);
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
}
}

pub(crate) fn candidates(&self) -> impl Iterator<Item = &NetworkAddress> {
self.candidates
.values()
.filter_map(|candidates| candidates.front())
}

fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
) -> Vec<(u32, NetworkAddress)> {
(0..num_to_generate)
.into_par_iter()
.filter_map(|_| {
let candidate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2_distance = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2_distance, candidate))
})
.collect::<Vec<_>>()
}
}

0 comments on commit 7c50c35

Please sign in to comment.