Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): impl routing table test #1005

Merged
merged 12 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ custom_debug = "~0.5.0"
libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] }
prometheus-client = { version = "0.22", optional = true }
rand = { version = "~0.8.5", features = ["small_rng"] }
rayon = "1.8.0"
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = [ "derive", "rc" ]}
sn_protocol = { path = "../sn_protocol", version = "0.8.29" }
Expand Down
6 changes: 5 additions & 1 deletion sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ impl SwarmDriver {
}

pub(crate) fn trigger_network_discovery(&mut self) {
let now = Instant::now();
// The query is just to trigger the network discovery,
// hence no need to wait for a result.
for addr in &self.network_discovery_candidates {
for addr in self.network_discovery_candidates.candidates() {
let _ = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_peers(addr.as_bytes());
}
self.network_discovery_candidates
.try_generate_new_candidates();
self.bootstrap.initiated();
debug!("Trigger network discovery took {:?}", now.elapsed());
}
}

Expand Down
53 changes: 4 additions & 49 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
event::NetworkEvent,
event::{GetRecordResultMap, NodeEvent},
multiaddr_pop_p2p,
network_discovery::NetworkDiscoveryCandidates,
record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig},
record_store_api::UnifiedRecordStore,
replication_fetcher::ReplicationFetcher,
Expand Down Expand Up @@ -50,7 +51,7 @@ use sn_protocol::{
NetworkAddress, PrettyPrintKBucketKey,
};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
Expand Down Expand Up @@ -495,7 +496,7 @@ impl NetworkBuilder {
// `identify` protocol to kick in and get them in the routing table.
dialed_peers: CircularVec::new(63),
is_gossip_handler: false,
network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id),
network_discovery_candidates: NetworkDiscoveryCandidates::new(&peer_id),
};

Ok((
Expand All @@ -511,51 +512,6 @@ impl NetworkBuilder {
}
}

fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec<NetworkAddress> {
let mut candidates: BTreeMap<usize, NetworkAddress> = BTreeMap::new();
// To avoid deadlock or taking too much time, currently set a fixed generation attempts
let mut attempts = 0;
// Also an early return when got the first 20 furthest kBuckets covered.
let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect();

let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key();
let local_key_bytes_len = local_key.hashed_bytes().len();
while attempts < 10000 && !buckets_covered.is_empty() {
let candiate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candiate.as_kbucket_key();
let candidate_key_bytes_len = candidate_key.hashed_bytes().len();

if local_key_bytes_len != candidate_key_bytes_len {
panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}");
}

let common_leading_bits =
common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes());

let _ = candidates.insert(common_leading_bits, candiate);
let _ = buckets_covered.remove(&common_leading_bits);

attempts += 1;
}

let generated_buckets: Vec<_> = candidates.keys().copied().collect();
let generated_candidates: Vec<_> = candidates.values().cloned().collect();
trace!("Generated targets covering kbuckets {generated_buckets:?}");
generated_candidates
}

/// Returns the length of the common leading bits.
/// e.g. when `11110000` and `11111111`, return as 4.
/// Note: the length of two shall be the same
fn common_leading_bits(one: &[u8], two: &[u8]) -> usize {
for byte_index in 0..one.len() {
if one[byte_index] != two[byte_index] {
return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize;
}
}
8 * one.len()
}

pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
Expand Down Expand Up @@ -584,9 +540,8 @@ pub struct SwarmDriver {
// they are not supposed to process the gossip msg that received from libp2p.
pub(crate) is_gossip_handler: bool,
// A list of random `PeerId` candidates that falls into kbuckets,
// one for each furthest 30 kbuckets.
// This is to ensure a more accurate network discovery.
pub(crate) network_discovery_candidates: Vec<NetworkAddress>,
pub(crate) network_discovery_candidates: NetworkDiscoveryCandidates,
}

impl SwarmDriver {
Expand Down
1 change: 1 addition & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod event;
mod metrics;
#[cfg(feature = "open-metrics")]
mod metrics_service;
mod network_discovery;
mod quorum;
mod record_store;
mod record_store_api;
Expand Down
108 changes: 108 additions & 0 deletions sn_networking/src/network_discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2023 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{kad::KBucketKey, PeerId};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
time::Instant,
};

const INITIAL_GENERATION_ATTEMPTS: usize = 10_000;
const GENERATION_ATTEMPTS: usize = 1_000;
const MAX_PEERS_PER_BUCKET: usize = 5;

#[derive(Debug, Clone)]
pub(crate) struct NetworkDiscoveryCandidates {
self_key: KBucketKey<PeerId>,
candidates: HashMap<u32, VecDeque<NetworkAddress>>,
}

impl NetworkDiscoveryCandidates {
pub(crate) fn new(self_peer_id: &PeerId) -> Self {
let start = Instant::now();
let self_key = KBucketKey::from(*self_peer_id);
let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);

let mut candidates: HashMap<u32, VecDeque<NetworkAddress>> = HashMap::new();
for (ilog2, candidate) in candidates_vec {
match candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
continue;
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
}

info!(
"Time to generate NetworkDiscoveryCandidates: {:?}",
start.elapsed()
);
let mut buckets_covered = candidates
.iter()
.map(|(ilog2, candidates)| (*ilog2, candidates.len()))
.collect::<Vec<_>>();
buckets_covered.sort_by_key(|(ilog2, _)| *ilog2);
info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}");

Self {
self_key,
candidates,
}
}

pub(crate) fn try_generate_new_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidate) in candidates_vec {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.len() >= MAX_PEERS_PER_BUCKET {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(candidate);
} else {
entry.push_back(candidate);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([candidate]));
}
}
}
}

pub(crate) fn candidates(&self) -> impl Iterator<Item = &NetworkAddress> {
self.candidates
.values()
.filter_map(|candidates| candidates.front())
maqi marked this conversation as resolved.
Show resolved Hide resolved
}

fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
) -> Vec<(u32, NetworkAddress)> {
(0..num_to_generate)
.into_par_iter()
.filter_map(|_| {
let candidate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2_distance = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2_distance, candidate))
})
.collect::<Vec<_>>()
}
}