diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 130dd61a69..a7a8026273 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -39,8 +39,8 @@ jobs: run: cargo build --release --bins timeout-minutes: 30 - - name: Build churn tests - run: cargo test --release -p sn_node --test data_with_churn --no-run + - name: Build tests + run: cargo test --release -p sn_node --test data_with_churn --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a node instance that does not undergo churn @@ -112,6 +112,12 @@ jobs: SN_LOG: "all" timeout-minutes: 30 + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture + env: + SLEEP_BEFORE_VERIFICATION: 300 + timeout-minutes: 10 + - name: Verify restart of nodes using rg shell: bash timeout-minutes: 1 diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 19d9fc8329..104e94c477 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -438,9 +438,9 @@ jobs: log_file_prefix: safe_test_logs_churn platform: ${{ matrix.os }} - verify_data_location: + verify_data_location_routing_table: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: Verify data location + name: Verify data location and Routing Table runs-on: ${{ matrix.os }} strategy: matrix: @@ -464,8 +464,8 @@ jobs: run: cargo build --release --features local-discovery --bin safenode --bin faucet timeout-minutes: 30 - - name: Build data location test - run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run + - name: Build data location and routing table tests + run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a local network @@ -487,6 +487,10 @@ jobs: echo "SAFE_PEERS has been set to $SAFE_PEERS" fi + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify the location of the data on the network (4 * 5 mins) run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture env: @@ -494,6 +498,10 @@ jobs: SN_LOG: "all" timeout-minutes: 30 + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify restart of nodes using rg shell: bash timeout-minutes: 1 diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index b8457bb10c..6ded81ef27 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -383,8 +383,8 @@ jobs: SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" SLACK_TITLE: "Nightly Churn Test Run Failed" - verify_data_location: - name: Verify data location + verify_data_location_routing_table: + name: Verify data location and Routing Table runs-on: ${{ matrix.os }} strategy: matrix: @@ -410,8 +410,8 @@ jobs: run: cargo build --release --features local-discovery --bin safenode --bin faucet timeout-minutes: 30 - - name: Build data location test - run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run + - name: Build data location and routing table tests + run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a local network @@ -423,12 +423,20 @@ jobs: faucet-path: target/release/faucet platform: ${{ matrix.os }} + - name: Verify the Routing table of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify the location of the data on the network (approx 12 * 5 mins) run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture env: CHURN_COUNT: 12 SN_LOG: "all" timeout-minutes: 90 + + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 - name: Verify restart of nodes using rg shell: bash diff --git a/Cargo.lock b/Cargo.lock index 84d7878deb..e029752b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4653,6 +4653,7 @@ dependencies = [ "prometheus-client 0.22.0", "quickcheck", "rand", + "rayon", "rmp-serde", "serde", "sn_protocol", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 3119acd6d8..b5ddbd81e7 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -26,6 +26,7 @@ custom_debug = "~0.5.0" libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] } prometheus-client = { version = "0.22", optional = true } rand = { version = "~0.8.5", features = ["small_rng"] } +rayon = "1.8.0" rmp-serde = "1.1.1" serde = { version = "1.0.133", features = [ "derive", "rc" ]} sn_protocol = { path = "../sn_protocol", version = "0.8.29" } diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 430919ddb2..7bb376012c 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -6,7 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::SwarmDriver; +use crate::{driver::PendingGetClosestType, SwarmDriver}; use std::time::{Duration, Instant}; use tokio::time::Interval; @@ -49,16 +49,24 @@ impl SwarmDriver { } pub(crate) fn trigger_network_discovery(&mut self) { - // The query is just to trigger the network discovery, - // hence no need to wait for a result. - for addr in &self.network_discovery_candidates { - let _ = self + let now = Instant::now(); + // Fetches the candidates and also generates new candidates + for addr in self.network_discovery.candidates() { + // The query_id is tracked here. This is to update the candidate list of network_discovery with the newly + // found closest peers. It may fill up the candidate list of closer buckets which are harder to generate. + let query_id = self .swarm .behaviour_mut() .kademlia .get_closest_peers(addr.as_bytes()); + let _ = self.pending_get_closest_peers.insert( + query_id, + (PendingGetClosestType::NetworkDiscovery, Default::default()), + ); } + self.bootstrap.initiated(); + debug!("Trigger network discovery took {:?}", now.elapsed()); } } diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 09ea9572f9..b870758bb6 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - driver::SwarmDriver, + driver::{PendingGetClosestType, SwarmDriver}, error::{Error, Result}, sort_peers_by_address, GetQuorum, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE, REPLICATE_RANGE, @@ -25,7 +25,7 @@ use sn_protocol::{ }; use sn_transfers::NanoTokens; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, }; use tokio::sync::oneshot; @@ -53,6 +53,11 @@ pub enum SwarmCmd { GetAllLocalPeers { sender: oneshot::Sender>, }, + /// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. + GetKBuckets { + sender: oneshot::Sender>>, + }, // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table. // And our PeerId as well. GetClosestKLocalPeers { @@ -240,9 +245,9 @@ impl Debug for SwarmCmd { SwarmCmd::GetAllLocalPeers { .. } => { write!(f, "SwarmCmd::GetAllLocalPeers") } - // SwarmCmd::GetOurCloseGroup { .. } => { - // write!(f, "SwarmCmd::GetOurCloseGroup") - // } + SwarmCmd::GetKBuckets { .. } => { + write!(f, "SwarmCmd::GetKBuckets") + } SwarmCmd::GetSwarmLocalState { .. } => { write!(f, "SwarmCmd::GetSwarmLocalState") } @@ -508,13 +513,34 @@ impl SwarmDriver { .behaviour_mut() .kademlia .get_closest_peers(key.as_bytes()); - let _ = self - .pending_get_closest_peers - .insert(query_id, (sender, Default::default())); + let _ = self.pending_get_closest_peers.insert( + query_id, + ( + PendingGetClosestType::FunctionCall(sender), + Default::default(), + ), + ); } SwarmCmd::GetAllLocalPeers { sender } => { let _ = sender.send(self.get_all_local_peers()); } + SwarmCmd::GetKBuckets { sender } => { + let mut ilog2_kbuckets = BTreeMap::new(); + for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { + let range = kbucket.range(); + if let Some(distance) = range.0.ilog2() { + let peers_in_kbucket = kbucket + .iter() + .map(|peer_entry| peer_entry.node.key.clone().into_preimage()) + .collect::>(); + let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket); + } else { + // This shall never happen. + error!("bucket is ourself ???!!!"); + } + } + let _ = sender.send(ilog2_kbuckets); + } SwarmCmd::GetCloseGroupLocalPeers { key, sender } => { let key = key.as_kbucket_key(); // calls `kbuckets.closest_keys(key)` internally, which orders the peers by diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 63dc850998..e640fbfb7c 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -18,6 +18,7 @@ use crate::{ event::NetworkEvent, event::{GetRecordResultMap, NodeEvent}, multiaddr_pop_p2p, + network_discovery::NetworkDiscovery, record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig}, record_store_api::UnifiedRecordStore, replication_fetcher::ReplicationFetcher, @@ -50,7 +51,7 @@ use sn_protocol::{ NetworkAddress, PrettyPrintKBucketKey, }; use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, net::SocketAddr, num::NonZeroUsize, path::PathBuf, @@ -63,7 +64,15 @@ use tracing::warn; /// List of expected record holders to be verified. pub(super) type ExpectedHoldersList = HashSet; -type PendingGetClosest = HashMap>, HashSet)>; +/// The ways in which the Get Closest queries are used. +pub(crate) enum PendingGetClosestType { + /// The network discovery method is present at the networking layer + /// Thus we can just process the queries made by NetworkDiscovery without using any channels + NetworkDiscovery, + /// These are queries made by a function at the upper layers and contains a channel to send the result back. + FunctionCall(oneshot::Sender>), +} +type PendingGetClosest = HashMap)>; type PendingGetRecord = HashMap< QueryId, ( @@ -495,7 +504,7 @@ impl NetworkBuilder { // `identify` protocol to kick in and get them in the routing table. dialed_peers: CircularVec::new(63), is_gossip_handler: false, - network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id), + network_discovery: NetworkDiscovery::new(&peer_id), }; Ok(( @@ -511,51 +520,6 @@ impl NetworkBuilder { } } -fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec { - let mut candidates: BTreeMap = BTreeMap::new(); - // To avoid deadlock or taking too much time, currently set a fixed generation attempts - let mut attempts = 0; - // Also an early return when got the first 20 furthest kBuckets covered. - let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect(); - - let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key(); - let local_key_bytes_len = local_key.hashed_bytes().len(); - while attempts < 10000 && !buckets_covered.is_empty() { - let candiate = NetworkAddress::from_peer(PeerId::random()); - let candidate_key = candiate.as_kbucket_key(); - let candidate_key_bytes_len = candidate_key.hashed_bytes().len(); - - if local_key_bytes_len != candidate_key_bytes_len { - panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}"); - } - - let common_leading_bits = - common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes()); - - let _ = candidates.insert(common_leading_bits, candiate); - let _ = buckets_covered.remove(&common_leading_bits); - - attempts += 1; - } - - let generated_buckets: Vec<_> = candidates.keys().copied().collect(); - let generated_candidates: Vec<_> = candidates.values().cloned().collect(); - trace!("Generated targets covering kbuckets {generated_buckets:?}"); - generated_candidates -} - -/// Returns the length of the common leading bits. -/// e.g. when `11110000` and `11111111`, return as 4. -/// Note: the length of two shall be the same -fn common_leading_bits(one: &[u8], two: &[u8]) -> usize { - for byte_index in 0..one.len() { - if one[byte_index] != two[byte_index] { - return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize; - } - } - 8 * one.len() -} - pub struct SwarmDriver { pub(crate) swarm: Swarm, pub(crate) self_peer_id: PeerId, @@ -584,9 +548,8 @@ pub struct SwarmDriver { // they are not supposed to process the gossip msg that received from libp2p. pub(crate) is_gossip_handler: bool, // A list of random `PeerId` candidates that falls into kbuckets, - // one for each furthest 30 kbuckets. // This is to ensure a more accurate network discovery. - pub(crate) network_discovery_candidates: Vec, + pub(crate) network_discovery: NetworkDiscovery, } impl SwarmDriver { diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index ce921f8570..ceee999a27 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -8,7 +8,7 @@ use crate::{ close_group_majority, - driver::{truncate_patch_version, SwarmDriver}, + driver::{truncate_patch_version, PendingGetClosestType, SwarmDriver}, error::{Error, Result}, multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, GetQuorum, CLOSE_GROUP_SIZE, }; @@ -606,7 +606,7 @@ impl SwarmDriver { "Query task {id:?} returned with peers {closest_peers:?}, {stats:?} - {step:?}" ); - let (sender, mut current_closest) = + let (get_closest_type, mut current_closest) = self.pending_get_closest_peers.remove(&id).ok_or_else(|| { trace!( "Can't locate query task {id:?}, it has likely been completed already." @@ -621,13 +621,20 @@ impl SwarmDriver { let new_peers: HashSet = closest_peers.peers.clone().into_iter().collect(); current_closest.extend(new_peers); if current_closest.len() >= usize::from(K_VALUE) || step.last { - sender - .send(current_closest) - .map_err(|_| Error::InternalMsgChannelDropped)?; + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| Error::InternalMsgChannelDropped)?; + } + } } else { let _ = self .pending_get_closest_peers - .insert(id, (sender, current_closest)); + .insert(id, (get_closest_type, current_closest)); } } // Handle GetClosestPeers timeouts @@ -640,7 +647,7 @@ impl SwarmDriver { event_string = "kad_event::get_closest_peers_err"; error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}"); - let (sender, mut current_closest) = + let (get_closest_type, mut current_closest) = self.pending_get_closest_peers.remove(&id).ok_or_else(|| { trace!( "Can't locate query task {id:?}, it has likely been completed already." @@ -657,9 +664,16 @@ impl SwarmDriver { } } - sender - .send(current_closest) - .map_err(|_| Error::InternalMsgChannelDropped)?; + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| Error::InternalMsgChannelDropped)?; + } + } } // For `get_record` returning behaviour: diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index fff8a894b2..2209e78bca 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -19,6 +19,7 @@ mod event; mod metrics; #[cfg(feature = "open-metrics")] mod metrics_service; +mod network_discovery; mod quorum; mod record_store; mod record_store_api; @@ -52,7 +53,7 @@ use sn_protocol::{ }; use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote}; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, path::PathBuf, }; use tokio::sync::{mpsc, oneshot}; @@ -183,6 +184,17 @@ impl Network { self.get_closest_peers(key, false).await } + /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. + /// Does not include self + pub async fn get_kbuckets(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); + self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender })?; + receiver + .await + .map_err(|_e| Error::InternalMsgChannelDropped) + } + /// Returns the closest peers to the given `NetworkAddress` that is fetched from the local /// Routing Table. It is ordered by increasing distance of the peers /// Note self peer_id is not included in the result. diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs new file mode 100644 index 0000000000..b582da99b5 --- /dev/null +++ b/sn_networking/src/network_discovery.rs @@ -0,0 +1,175 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use libp2p::{kad::KBucketKey, PeerId}; +use rand::{thread_rng, Rng}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use sn_protocol::NetworkAddress; +use std::{ + collections::{btree_map::Entry, BTreeMap, HashSet}, + time::Instant, +}; + +// The number of PeerId to generate when starting an instance of NetworkDiscovery +const INITIAL_GENERATION_ATTEMPTS: usize = 10_000; +// The number of PeerId to generate during each invocation to refresh our candidates +const GENERATION_ATTEMPTS: usize = 1_000; +// The max number of PeerId to keep per bucket +const MAX_PEERS_PER_BUCKET: usize = 5; + +/// Keep track of NetworkAddresses belonging to every bucket (if we can generate them with reasonable effort) +/// which we can then query using Kad::GetClosestPeers to effectively fill our RT. +#[derive(Debug, Clone)] +pub(crate) struct NetworkDiscovery { + self_key: KBucketKey, + candidates: BTreeMap>, +} + +impl NetworkDiscovery { + /// Create a new instance of NetworkDiscovery and tries to populate each bucket with random peers. + pub(crate) fn new(self_peer_id: &PeerId) -> Self { + let start = Instant::now(); + let self_key = KBucketKey::from(*self_peer_id); + let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); + + info!( + "Time to generate NetworkDiscoveryCandidates: {:?}", + start.elapsed() + ); + let buckets_covered = candidates + .iter() + .map(|(ilog2, candidates)| (*ilog2, candidates.len())) + .collect::>(); + info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}"); + + Self { + self_key, + candidates, + } + } + + /// The result from the kad::GetClosestPeers are again used to update our kbucket. + pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { + let now = Instant::now(); + + let candidates_map: BTreeMap> = closest_peers + .into_iter() + .filter_map(|peer| { + let peer = NetworkAddress::from_peer(peer); + let peer_key = peer.as_kbucket_key(); + peer_key + .distance(&self.self_key) + .ilog2() + .map(|ilog2| (ilog2, peer)) + }) + // To collect the NetworkAddresses into a vector. + .fold(BTreeMap::new(), |mut acc, (ilog2, peer)| { + acc.entry(ilog2).or_default().push(peer); + acc + }); + + for (ilog2, candidates) in candidates_map { + self.insert_candidates(ilog2, candidates); + } + + trace!( + "It took {:?} to NetworkDiscovery::handle get closest query", + now.elapsed() + ); + } + + /// Returns one random candidate per bucket. Also tries to refresh the candidate list. + /// Todo: Limit the candidates to return. Favor the closest buckets. + pub(crate) fn candidates(&mut self) -> Vec<&NetworkAddress> { + self.try_refresh_candidates(); + + let mut rng = thread_rng(); + let mut op = Vec::with_capacity(self.candidates.len()); + + let candidates = self.candidates.values().filter_map(|candidates| { + // get a random index each time + let random_index = rng.gen::() % candidates.len(); + candidates.get(random_index) + }); + op.extend(candidates); + op + } + + /// Tries to refresh our current candidate list. We replace the old ones with new if we find any. + fn try_refresh_candidates(&mut self) { + let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); + for (ilog2, candidates) in candidates_vec { + self.insert_candidates(ilog2, candidates); + } + } + + // Insert the new candidates and remove the old ones to maintain MAX_PEERS_PER_BUCKET. + fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec) { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let existing_candidates = entry.get_mut(); + // insert only newly seen new_candidates + let new_candidates: Vec<_> = new_candidates + .into_iter() + .filter(|candidate| !existing_candidates.contains(candidate)) + .collect(); + existing_candidates.extend(new_candidates); + // Keep only the last MAX_PEERS_PER_BUCKET elements i.e., the newest ones + let excess = existing_candidates + .len() + .saturating_sub(MAX_PEERS_PER_BUCKET); + if excess > 0 { + existing_candidates.drain(..excess); + } + } + Entry::Vacant(entry) => { + entry.insert(new_candidates); + } + } + } + + /// Uses rayon to parallelize the generation + fn generate_candidates( + self_key: &KBucketKey, + num_to_generate: usize, + ) -> BTreeMap> { + (0..num_to_generate) + .into_par_iter() + .filter_map(|_| { + let candidate = NetworkAddress::from_peer(PeerId::random()); + let candidate_key = candidate.as_kbucket_key(); + let ilog2 = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2, candidate)) + }) + // Since it is parallel iterator, the fold fn batches the items and will produce multiple outputs. So we + // should use reduce fn to combine multiple outputs. + .fold( + BTreeMap::new, + |mut acc: BTreeMap>, (ilog2, candidate)| { + acc.entry(ilog2).or_default().push(candidate); + acc + }, + ) + .reduce( + BTreeMap::new, + |mut acc: BTreeMap>, map| { + for (ilog2, candidates) in map { + let entry = acc.entry(ilog2).or_default(); + for candidate in candidates { + if entry.len() < MAX_PEERS_PER_BUCKET { + entry.push(candidate); + } else { + break; + } + } + } + acc + }, + ) + } +} diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index 64b970cf8b..d72e3c179e 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -13,14 +13,16 @@ use bls::{PublicKey, PK_SIZE}; use eyre::{ErrReport, Result}; use sn_protocol::node_rpc::NodeCtrl; use sn_protocol::safenode_proto::{ + k_buckets_response, safe_node_server::{SafeNode, SafeNodeServer}, GossipsubPublishRequest, GossipsubPublishResponse, GossipsubSubscribeRequest, GossipsubSubscribeResponse, GossipsubUnsubscribeRequest, GossipsubUnsubscribeResponse, - NetworkInfoRequest, NetworkInfoResponse, NodeEvent, NodeEventsRequest, NodeInfoRequest, - NodeInfoResponse, RecordAddressesRequest, RecordAddressesResponse, RestartRequest, - RestartResponse, StopRequest, StopResponse, TransferNotifsFilterRequest, - TransferNotifsFilterResponse, UpdateRequest, UpdateResponse, + KBucketsRequest, KBucketsResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent, + NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest, + RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse, + TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateRequest, UpdateResponse, }; +use std::collections::HashMap; use std::{ env, net::SocketAddr, @@ -182,6 +184,32 @@ impl SafeNode for SafeNodeRpcService { Ok(Response::new(RecordAddressesResponse { addresses })) } + async fn k_buckets( + &self, + request: Request, + ) -> Result, Status> { + trace!( + "RPC request received at {}: {:?}", + self.addr, + request.get_ref() + ); + + let kbuckets: HashMap = self + .running_node + .get_kbuckets() + .await + .unwrap() + .into_iter() + .map(|(ilog2_distance, peers)| { + let peers = peers.into_iter().map(|peer| peer.to_bytes()).collect(); + let peers = k_buckets_response::Peers { peers }; + (ilog2_distance, peers) + }) + .collect(); + + Ok(Response::new(KBucketsResponse { kbuckets })) + } + async fn subscribe_to_topic( &self, request: Request, diff --git a/sn_node/src/lib.rs b/sn_node/src/lib.rs index 00cdcef5dc..a1428d83c1 100644 --- a/sn_node/src/lib.rs +++ b/sn_node/src/lib.rs @@ -65,7 +65,10 @@ use bytes::Bytes; use libp2p::PeerId; use sn_networking::{Network, SwarmLocalState}; use sn_protocol::NetworkAddress; -use std::{collections::HashSet, path::PathBuf}; +use std::{ + collections::{BTreeMap, HashSet}, + path::PathBuf, +}; use tokio::sync::broadcast; /// Once a node is started and running, the user obtains @@ -119,6 +122,13 @@ impl RunningNode { Ok(addresses) } + /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. + pub async fn get_kbuckets(&self) -> Result>> { + let kbuckets = self.network.get_kbuckets().await?; + Ok(kbuckets) + } + /// Subscribe to given gossipsub topic pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { self.network.subscribe_to_topic(topic_id)?; diff --git a/sn_node/tests/common/mod.rs b/sn_node/tests/common/mod.rs index 29d4b5e853..b1fc7a58dd 100644 --- a/sn_node/tests/common/mod.rs +++ b/sn_node/tests/common/mod.rs @@ -8,6 +8,7 @@ #![allow(dead_code)] +use libp2p::PeerId; use self_encryption::MIN_ENCRYPTABLE_BYTES; use sn_client::{load_faucet_wallet_from_genesis_wallet, send, Client, Files}; use sn_peers_acquisition::parse_peer_addr; @@ -25,6 +26,7 @@ use rand::{ Rng, }; use sn_transfers::NanoTokens; +use std::net::{IpAddr, Ipv4Addr}; use std::{ fs::File, io::Write, @@ -38,6 +40,9 @@ use xor_name::XorName; type ResultRandomContent = Result<(Files, Bytes, ChunkAddress, Vec<(XorName, PathBuf)>)>; pub const PAYING_WALLET_INITIAL_BALANCE: u64 = 100_000_000_000_000; +pub const NODE_COUNT: u32 = 25; + +const RPC_CONCURRENT_REQUESTS: usize = 20; lazy_static! { // mutex to restrict access to faucet wallet from concurrent tests @@ -133,6 +138,27 @@ pub fn random_content( )) } +// Returns all the PeerId for all the locally running nodes +pub async fn get_all_peer_ids() -> Result> { + let mut all_peers = Vec::new(); + + let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + for node_index in 1..NODE_COUNT + 1 { + addr.set_port(12000 + node_index as u16); + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + + // get the peer_id + let response = rpc_client + .node_info(Request::new(NodeInfoRequest {})) + .await?; + let peer_id = PeerId::from_bytes(&response.get_ref().peer_id)?; + all_peers.push(peer_id); + } + println!("Obtained the PeerId list for the locally running network with a node count of {NODE_COUNT}"); + Ok(all_peers) +} + pub async fn node_restart(addr: SocketAddr) -> Result<()> { let endpoint = format!("https://{addr}"); let mut client = SafeNodeClient::connect(endpoint).await?; diff --git a/sn_node/tests/data_with_churn.rs b/sn_node/tests/data_with_churn.rs index 2198ef5dec..805968add7 100644 --- a/sn_node/tests/data_with_churn.rs +++ b/sn_node/tests/data_with_churn.rs @@ -10,7 +10,7 @@ mod common; use assert_fs::TempDir; use common::{ - get_funded_wallet, get_gossip_client_and_wallet, get_wallet, node_restart, + get_funded_wallet, get_gossip_client_and_wallet, get_wallet, node_restart, NODE_COUNT, PAYING_WALLET_INITIAL_BALANCE, }; use eyre::{bail, eyre, Result}; @@ -38,8 +38,6 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep}; use tracing::{debug, trace}; use xor_name::XorName; -const NODE_COUNT: u32 = 25; - const EXTRA_CHURN_COUNT: u32 = 5; const CHURN_CYCLES: u32 = 1; const CHUNK_CREATION_RATIO_TO_CHURN: u32 = 15; diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 4c7d2f014c..be436ffe36 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -8,6 +8,7 @@ mod common; +use common::NODE_COUNT; use eyre::Result; use sn_logging::LogBuilder; use sn_node::NodeEvent; @@ -23,8 +24,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tonic::Request; -const NODE_COUNT: u8 = 25; -const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers +const NODES_SUBSCRIBED: u32 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers const TEST_CYCLES: u8 = 20; #[tokio::test] @@ -47,7 +47,7 @@ async fn msgs_over_gossipsub() -> Result<()> { // get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic let mut rng = rand::thread_rng(); let random_subs_nodes: Vec<_> = - rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into()) + rand::seq::index::sample(&mut rng, NODE_COUNT as usize, NODES_SUBSCRIBED as usize) .iter() .map(|i| all_nodes_addrs[i]) .collect(); @@ -65,7 +65,7 @@ async fn msgs_over_gossipsub() -> Result<()> { .node_events(Request::new(NodeEventsRequest {})) .await?; - let mut count = 0; + let mut count: u32 = 0; let _ = timeout(Duration::from_secs(40), async { let mut stream = response.into_inner(); @@ -87,7 +87,7 @@ async fn msgs_over_gossipsub() -> Result<()> { }) .await; - Ok::(count) + Ok::(count) }); subs_handles.push((node_index, addr, handle)); @@ -141,7 +141,7 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< } async fn other_nodes_to_publish_on_topic( - nodes: Vec<(u8, SocketAddr)>, + nodes: Vec<(u32, SocketAddr)>, topic: String, ) -> Result<()> { for (node_index, addr) in nodes { diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index ddd21d3eb2..264770c7d3 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -9,7 +9,10 @@ #![allow(clippy::mutable_key_type)] mod common; -use crate::common::{get_gossip_client_and_wallet, node_restart, PAYING_WALLET_INITIAL_BALANCE}; +use crate::common::{ + get_all_peer_ids, get_gossip_client_and_wallet, node_restart, NODE_COUNT, + PAYING_WALLET_INITIAL_BALANCE, +}; use assert_fs::TempDir; use eyre::{eyre, Result}; use libp2p::{ @@ -35,7 +38,6 @@ use std::{ use tonic::Request; use tracing::error; -const NODE_COUNT: u8 = 25; const CHUNK_SIZE: usize = 1024; // VERIFICATION_DELAY is set based on the dead peer detection interval @@ -57,7 +59,7 @@ const CHURN_COUNT: u8 = 4; // It can be overridden by setting the 'CHUNK_COUNT' env var. const CHUNK_COUNT: usize = 5; -type NodeIndex = u8; +type NodeIndex = u32; type RecordHolders = HashMap>; #[tokio::test(flavor = "multi_thread")] @@ -276,27 +278,6 @@ async fn verify_location(all_peers: &[PeerId]) -> Result<()> { } } -// Returns all the PeerId for all the locally running nodes -async fn get_all_peer_ids() -> Result> { - let mut all_peers = Vec::new(); - - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - for node_index in 1..NODE_COUNT + 1 { - addr.set_port(12000 + node_index as u16); - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - - // get the peer_id - let response = rpc_client - .node_info(Request::new(NodeInfoRequest {})) - .await?; - let peer_id = PeerId::from_bytes(&response.get_ref().peer_id)?; - all_peers.push(peer_id); - } - println!("Obtained the PeerId list for the locally running network with a node count of {NODE_COUNT}"); - Ok(all_peers) -} - // Generate random Chunks and store them to the Network async fn store_chunks(client: Client, chunk_count: usize, wallet_dir: PathBuf) -> Result<()> { let start = Instant::now(); diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs new file mode 100644 index 0000000000..998c0a218a --- /dev/null +++ b/sn_node/tests/verify_routing_table.rs @@ -0,0 +1,110 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +#![allow(clippy::mutable_key_type)] +mod common; + +use crate::common::{get_all_peer_ids, NODE_COUNT}; +use color_eyre::Result; +use libp2p::{ + kad::{KBucketKey, K_VALUE}, + PeerId, +}; +use sn_logging::LogBuilder; +use sn_protocol::safenode_proto::{safe_node_client::SafeNodeClient, KBucketsRequest}; +use std::{ + collections::{BTreeMap, HashSet}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; +use tonic::Request; + +/// Sleep for sometime for the nodes for discover each other before verification +/// Also can be set through the env variable of the same name. +const SLEEP_BEFORE_VERIFICATION: Duration = Duration::from_secs(5); + +#[tokio::test(flavor = "multi_thread")] +async fn verify_routing_table() -> Result<()> { + let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("verify_routing_table"); + + let sleep_duration = std::env::var("SLEEP_BEFORE_VERIFICATION") + .map(|value| { + value + .parse::() + .expect("Failed to prase sleep value into u64") + }) + .map(Duration::from_secs) + .unwrap_or(SLEEP_BEFORE_VERIFICATION); + println!("Sleeping for {sleep_duration:?} before verification"); + tokio::time::sleep(sleep_duration).await; + + let all_peers = get_all_peer_ids().await?; + let mut all_failed_list = BTreeMap::new(); + + for node_index in 1..NODE_COUNT + 1 { + let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + addr.set_port(12000 + node_index as u16); + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + + let response = rpc_client + .k_buckets(Request::new(KBucketsRequest {})) + .await?; + + let k_buckets = response.get_ref().kbuckets.clone(); + let k_buckets = k_buckets + .into_iter() + .map(|(ilog2, peers)| { + let peers = peers + .peers + .clone() + .into_iter() + .map(|peer_bytes| PeerId::from_bytes(&peer_bytes).unwrap()) + .collect::>(); + (ilog2, peers) + }) + .collect::>(); + + let current_peer = all_peers[node_index as usize - 1]; + let current_peer_key = KBucketKey::from(current_peer); + + let mut failed_list = Vec::new(); + for peer in all_peers.iter() { + let ilog2_distance = match KBucketKey::from(*peer).distance(¤t_peer_key).ilog2() { + Some(distance) => distance, + // None if same key + None => continue, + }; + match k_buckets.get(&ilog2_distance) { + Some(bucket) => { + if bucket.contains(peer) { + continue; + } else if bucket.len() == K_VALUE.get() { + println!("{peer:?} should be inside the ilog2 bucket: {ilog2_distance:?} of {current_peer:?}. But skipped as the bucket is full"); + continue; + } else { + println!("{peer:?} not found inside the kbucket with ilog2 {ilog2_distance:?} of {current_peer:?} RT"); + failed_list.push(*peer); + } + } + None => { + println!("Current peer {current_peer:?} should be {ilog2_distance} ilog2 distance away from {peer:?}, but that kbucket is not present for current_peer."); + failed_list.push(*peer); + } + } + } + if !failed_list.is_empty() { + all_failed_list.insert(current_peer, failed_list); + } + } + if !all_failed_list.is_empty() { + println!("Failed to verify routing table:\n{all_failed_list:?}"); + panic!("Failed to verify routing table"); + } + Ok(()) +} diff --git a/sn_protocol/src/safenode_proto/req_resp_types.proto b/sn_protocol/src/safenode_proto/req_resp_types.proto index f8f23dce82..feb0911602 100644 --- a/sn_protocol/src/safenode_proto/req_resp_types.proto +++ b/sn_protocol/src/safenode_proto/req_resp_types.proto @@ -45,6 +45,16 @@ message RecordAddressesResponse { repeated bytes addresses = 1; } +// KBuckets of this node +message KBucketsRequest {} + +message KBucketsResponse { + message Peers { + repeated bytes peers = 1; + } + map kbuckets = 1; +} + // Subsribe to a gossipsub topic message GossipsubSubscribeRequest { string topic = 1; diff --git a/sn_protocol/src/safenode_proto/safenode.proto b/sn_protocol/src/safenode_proto/safenode.proto index dbc38b0324..df217c9e83 100644 --- a/sn_protocol/src/safenode_proto/safenode.proto +++ b/sn_protocol/src/safenode_proto/safenode.proto @@ -37,6 +37,9 @@ service SafeNode { // Returns the Addresses of all the Records stored by this node rpc RecordAddresses (RecordAddressesRequest) returns (RecordAddressesResponse); + // Returns the entire Kbucket of this node + rpc KBuckets (KBucketsRequest) returns (KBucketsResponse); + // Subscribe to a Gossipsub topic rpc SubscribeToTopic (GossipsubSubscribeRequest) returns (GossipsubSubscribeResponse);