diff --git a/Cargo.lock b/Cargo.lock index 84d7878deb..e029752b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4653,6 +4653,7 @@ dependencies = [ "prometheus-client 0.22.0", "quickcheck", "rand", + "rayon", "rmp-serde", "serde", "sn_protocol", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 3119acd6d8..b5ddbd81e7 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -26,6 +26,7 @@ custom_debug = "~0.5.0" libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] } prometheus-client = { version = "0.22", optional = true } rand = { version = "~0.8.5", features = ["small_rng"] } +rayon = "1.8.0" rmp-serde = "1.1.1" serde = { version = "1.0.133", features = [ "derive", "rc" ]} sn_protocol = { path = "../sn_protocol", version = "0.8.29" } diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 430919ddb2..5643eb9bc6 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -49,16 +49,20 @@ impl SwarmDriver { } pub(crate) fn trigger_network_discovery(&mut self) { + let now = Instant::now(); // The query is just to trigger the network discovery, // hence no need to wait for a result. - for addr in &self.network_discovery_candidates { + for addr in self.network_discovery_candidates.candidates() { let _ = self .swarm .behaviour_mut() .kademlia .get_closest_peers(addr.as_bytes()); } + self.network_discovery_candidates + .try_generate_new_candidates(); self.bootstrap.initiated(); + debug!("Trigger network discovery took {:?}", now.elapsed()); } } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 63dc850998..d23d6743f3 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -18,6 +18,7 @@ use crate::{ event::NetworkEvent, event::{GetRecordResultMap, NodeEvent}, multiaddr_pop_p2p, + network_discovery::NetworkDiscoveryCandidates, record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig}, record_store_api::UnifiedRecordStore, replication_fetcher::ReplicationFetcher, @@ -50,7 +51,7 @@ use sn_protocol::{ NetworkAddress, PrettyPrintKBucketKey, }; use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, net::SocketAddr, num::NonZeroUsize, path::PathBuf, @@ -495,7 +496,7 @@ impl NetworkBuilder { // `identify` protocol to kick in and get them in the routing table. dialed_peers: CircularVec::new(63), is_gossip_handler: false, - network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id), + network_discovery_candidates: NetworkDiscoveryCandidates::new(&peer_id), }; Ok(( @@ -511,51 +512,6 @@ impl NetworkBuilder { } } -fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec { - let mut candidates: BTreeMap = BTreeMap::new(); - // To avoid deadlock or taking too much time, currently set a fixed generation attempts - let mut attempts = 0; - // Also an early return when got the first 20 furthest kBuckets covered. - let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect(); - - let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key(); - let local_key_bytes_len = local_key.hashed_bytes().len(); - while attempts < 10000 && !buckets_covered.is_empty() { - let candiate = NetworkAddress::from_peer(PeerId::random()); - let candidate_key = candiate.as_kbucket_key(); - let candidate_key_bytes_len = candidate_key.hashed_bytes().len(); - - if local_key_bytes_len != candidate_key_bytes_len { - panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}"); - } - - let common_leading_bits = - common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes()); - - let _ = candidates.insert(common_leading_bits, candiate); - let _ = buckets_covered.remove(&common_leading_bits); - - attempts += 1; - } - - let generated_buckets: Vec<_> = candidates.keys().copied().collect(); - let generated_candidates: Vec<_> = candidates.values().cloned().collect(); - trace!("Generated targets covering kbuckets {generated_buckets:?}"); - generated_candidates -} - -/// Returns the length of the common leading bits. -/// e.g. when `11110000` and `11111111`, return as 4. -/// Note: the length of two shall be the same -fn common_leading_bits(one: &[u8], two: &[u8]) -> usize { - for byte_index in 0..one.len() { - if one[byte_index] != two[byte_index] { - return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize; - } - } - 8 * one.len() -} - pub struct SwarmDriver { pub(crate) swarm: Swarm, pub(crate) self_peer_id: PeerId, @@ -584,9 +540,8 @@ pub struct SwarmDriver { // they are not supposed to process the gossip msg that received from libp2p. pub(crate) is_gossip_handler: bool, // A list of random `PeerId` candidates that falls into kbuckets, - // one for each furthest 30 kbuckets. // This is to ensure a more accurate network discovery. - pub(crate) network_discovery_candidates: Vec, + pub(crate) network_discovery_candidates: NetworkDiscoveryCandidates, } impl SwarmDriver { diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 612d281a40..06cede0229 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -19,6 +19,7 @@ mod event; mod metrics; #[cfg(feature = "open-metrics")] mod metrics_service; +mod network_discovery; mod quorum; mod record_store; mod record_store_api; diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs new file mode 100644 index 0000000000..1bf7369238 --- /dev/null +++ b/sn_networking/src/network_discovery.rs @@ -0,0 +1,108 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use libp2p::{kad::KBucketKey, PeerId}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use sn_protocol::NetworkAddress; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + time::Instant, +}; + +const INITIAL_GENERATION_ATTEMPTS: usize = 10_000; +const GENERATION_ATTEMPTS: usize = 1_000; +const MAX_PEERS_PER_BUCKET: usize = 5; + +#[derive(Debug, Clone)] +pub(crate) struct NetworkDiscoveryCandidates { + self_key: KBucketKey, + candidates: HashMap>, +} + +impl NetworkDiscoveryCandidates { + pub(crate) fn new(self_peer_id: &PeerId) -> Self { + let start = Instant::now(); + let self_key = KBucketKey::from(*self_peer_id); + let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); + + let mut candidates: HashMap> = HashMap::new(); + for (ilog2, candidate) in candidates_vec { + match candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + if entry.len() >= MAX_PEERS_PER_BUCKET { + continue; + } else { + entry.push_back(candidate); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(VecDeque::from([candidate])); + } + } + } + + info!( + "Time to generate NetworkDiscoveryCandidates: {:?}", + start.elapsed() + ); + let mut buckets_covered = candidates + .iter() + .map(|(ilog2, candidates)| (*ilog2, candidates.len())) + .collect::>(); + buckets_covered.sort_by_key(|(ilog2, _)| *ilog2); + info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}"); + + Self { + self_key, + candidates, + } + } + + pub(crate) fn try_generate_new_candidates(&mut self) { + let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); + for (ilog2, candidate) in candidates_vec { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + if entry.len() >= MAX_PEERS_PER_BUCKET { + // pop the front (as it might have been already used for querying and insert the new one at the back + let _ = entry.pop_front(); + entry.push_back(candidate); + } else { + entry.push_back(candidate); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(VecDeque::from([candidate])); + } + } + } + } + + pub(crate) fn candidates(&self) -> impl Iterator { + self.candidates + .values() + .filter_map(|candidates| candidates.front()) + } + + fn generate_candidates( + self_key: &KBucketKey, + num_to_generate: usize, + ) -> Vec<(u32, NetworkAddress)> { + (0..num_to_generate) + .into_par_iter() + .filter_map(|_| { + let candidate = NetworkAddress::from_peer(PeerId::random()); + let candidate_key = candidate.as_kbucket_key(); + let ilog2_distance = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2_distance, candidate)) + }) + .collect::>() + } +}