From ee7fa9ce3388e388d3a9b9056a60ba1d32e0a35b Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 23 Nov 2023 18:29:25 +0530 Subject: [PATCH 01/12] feat(rpc): return the KBuckets map --- sn_networking/src/cmd.rs | 27 ++++++++++++-- sn_networking/src/lib.rs | 10 ++++++ sn_node/src/bin/safenode/rpc_service.rs | 36 ++++++++++++++++--- sn_node/src/lib.rs | 11 +++++- .../src/safenode_proto/req_resp_types.proto | 10 ++++++ sn_protocol/src/safenode_proto/safenode.proto | 3 ++ 6 files changed, 89 insertions(+), 8 deletions(-) diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 09ea9572f9..ce050f738c 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -53,6 +53,10 @@ pub enum SwarmCmd { GetAllLocalPeers { sender: oneshot::Sender>, }, + // Get the map of ilog2 distance of the Kbucket to the peers in that bucket + GetKBuckets { + sender: oneshot::Sender>>, + }, // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table. // And our PeerId as well. GetClosestKLocalPeers { @@ -240,9 +244,9 @@ impl Debug for SwarmCmd { SwarmCmd::GetAllLocalPeers { .. } => { write!(f, "SwarmCmd::GetAllLocalPeers") } - // SwarmCmd::GetOurCloseGroup { .. } => { - // write!(f, "SwarmCmd::GetOurCloseGroup") - // } + SwarmCmd::GetKBuckets { .. } => { + write!(f, "SwarmCmd::GetKBuckets") + } SwarmCmd::GetSwarmLocalState { .. } => { write!(f, "SwarmCmd::GetSwarmLocalState") } @@ -515,6 +519,23 @@ impl SwarmDriver { SwarmCmd::GetAllLocalPeers { sender } => { let _ = sender.send(self.get_all_local_peers()); } + SwarmCmd::GetKBuckets { sender } => { + let mut ilog2_kbuckets = HashMap::new(); + for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { + let range = kbucket.range(); + if let Some(distance) = range.0.ilog2() { + let peers_in_kbucket = kbucket + .iter() + .map(|peer_entry| peer_entry.node.key.clone().into_preimage()) + .collect::>(); + let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket); + } else { + // This shall never happen. + error!("bucket is ourself ???!!!"); + } + } + let _ = sender.send(ilog2_kbuckets); + } SwarmCmd::GetCloseGroupLocalPeers { key, sender } => { let key = key.as_kbucket_key(); // calls `kbuckets.closest_keys(key)` internally, which orders the peers by diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index fff8a894b2..612d281a40 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -183,6 +183,16 @@ impl Network { self.get_closest_peers(key, false).await } + /// Returns the map of ilog2 distance of the Kbucket to the peers in that bucket + /// Does not include self + pub async fn get_kbuckets(&self) -> Result>> { + let (sender, receiver) = oneshot::channel(); + self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender })?; + receiver + .await + .map_err(|_e| Error::InternalMsgChannelDropped) + } + /// Returns the closest peers to the given `NetworkAddress` that is fetched from the local /// Routing Table. It is ordered by increasing distance of the peers /// Note self peer_id is not included in the result. diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index 64b970cf8b..d72e3c179e 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -13,14 +13,16 @@ use bls::{PublicKey, PK_SIZE}; use eyre::{ErrReport, Result}; use sn_protocol::node_rpc::NodeCtrl; use sn_protocol::safenode_proto::{ + k_buckets_response, safe_node_server::{SafeNode, SafeNodeServer}, GossipsubPublishRequest, GossipsubPublishResponse, GossipsubSubscribeRequest, GossipsubSubscribeResponse, GossipsubUnsubscribeRequest, GossipsubUnsubscribeResponse, - NetworkInfoRequest, NetworkInfoResponse, NodeEvent, NodeEventsRequest, NodeInfoRequest, - NodeInfoResponse, RecordAddressesRequest, RecordAddressesResponse, RestartRequest, - RestartResponse, StopRequest, StopResponse, TransferNotifsFilterRequest, - TransferNotifsFilterResponse, UpdateRequest, UpdateResponse, + KBucketsRequest, KBucketsResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent, + NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest, + RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse, + TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateRequest, UpdateResponse, }; +use std::collections::HashMap; use std::{ env, net::SocketAddr, @@ -182,6 +184,32 @@ impl SafeNode for SafeNodeRpcService { Ok(Response::new(RecordAddressesResponse { addresses })) } + async fn k_buckets( + &self, + request: Request, + ) -> Result, Status> { + trace!( + "RPC request received at {}: {:?}", + self.addr, + request.get_ref() + ); + + let kbuckets: HashMap = self + .running_node + .get_kbuckets() + .await + .unwrap() + .into_iter() + .map(|(ilog2_distance, peers)| { + let peers = peers.into_iter().map(|peer| peer.to_bytes()).collect(); + let peers = k_buckets_response::Peers { peers }; + (ilog2_distance, peers) + }) + .collect(); + + Ok(Response::new(KBucketsResponse { kbuckets })) + } + async fn subscribe_to_topic( &self, request: Request, diff --git a/sn_node/src/lib.rs b/sn_node/src/lib.rs index 00cdcef5dc..696a182e6d 100644 --- a/sn_node/src/lib.rs +++ b/sn_node/src/lib.rs @@ -65,7 +65,10 @@ use bytes::Bytes; use libp2p::PeerId; use sn_networking::{Network, SwarmLocalState}; use sn_protocol::NetworkAddress; -use std::{collections::HashSet, path::PathBuf}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, +}; use tokio::sync::broadcast; /// Once a node is started and running, the user obtains @@ -119,6 +122,12 @@ impl RunningNode { Ok(addresses) } + /// Returns the map of ilog2 distance of the Kbucket to the peers in that bucket + pub async fn get_kbuckets(&self) -> Result>> { + let kbuckets = self.network.get_kbuckets().await?; + Ok(kbuckets) + } + /// Subscribe to given gossipsub topic pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { self.network.subscribe_to_topic(topic_id)?; diff --git a/sn_protocol/src/safenode_proto/req_resp_types.proto b/sn_protocol/src/safenode_proto/req_resp_types.proto index f8f23dce82..feb0911602 100644 --- a/sn_protocol/src/safenode_proto/req_resp_types.proto +++ b/sn_protocol/src/safenode_proto/req_resp_types.proto @@ -45,6 +45,16 @@ message RecordAddressesResponse { repeated bytes addresses = 1; } +// KBuckets of this node +message KBucketsRequest {} + +message KBucketsResponse { + message Peers { + repeated bytes peers = 1; + } + map kbuckets = 1; +} + // Subsribe to a gossipsub topic message GossipsubSubscribeRequest { string topic = 1; diff --git a/sn_protocol/src/safenode_proto/safenode.proto b/sn_protocol/src/safenode_proto/safenode.proto index dbc38b0324..df217c9e83 100644 --- a/sn_protocol/src/safenode_proto/safenode.proto +++ b/sn_protocol/src/safenode_proto/safenode.proto @@ -37,6 +37,9 @@ service SafeNode { // Returns the Addresses of all the Records stored by this node rpc RecordAddresses (RecordAddressesRequest) returns (RecordAddressesResponse); + // Returns the entire Kbucket of this node + rpc KBuckets (KBucketsRequest) returns (KBucketsResponse); + // Subscribe to a Gossipsub topic rpc SubscribeToTopic (GossipsubSubscribeRequest) returns (GossipsubSubscribeResponse); From e2787d96120c8473458c5a7b65d1bff5a9910f8f Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 23 Nov 2023 22:22:43 +0530 Subject: [PATCH 02/12] feat(test): impl routing table test --- sn_node/tests/common/mod.rs | 26 ++++++++ sn_node/tests/data_with_churn.rs | 4 +- sn_node/tests/msgs_over_gossipsub.rs | 12 ++-- sn_node/tests/verify_data_location.rs | 29 ++------ sn_node/tests/verify_routing_table.rs | 96 +++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 33 deletions(-) create mode 100644 sn_node/tests/verify_routing_table.rs diff --git a/sn_node/tests/common/mod.rs b/sn_node/tests/common/mod.rs index 29d4b5e853..b1fc7a58dd 100644 --- a/sn_node/tests/common/mod.rs +++ b/sn_node/tests/common/mod.rs @@ -8,6 +8,7 @@ #![allow(dead_code)] +use libp2p::PeerId; use self_encryption::MIN_ENCRYPTABLE_BYTES; use sn_client::{load_faucet_wallet_from_genesis_wallet, send, Client, Files}; use sn_peers_acquisition::parse_peer_addr; @@ -25,6 +26,7 @@ use rand::{ Rng, }; use sn_transfers::NanoTokens; +use std::net::{IpAddr, Ipv4Addr}; use std::{ fs::File, io::Write, @@ -38,6 +40,9 @@ use xor_name::XorName; type ResultRandomContent = Result<(Files, Bytes, ChunkAddress, Vec<(XorName, PathBuf)>)>; pub const PAYING_WALLET_INITIAL_BALANCE: u64 = 100_000_000_000_000; +pub const NODE_COUNT: u32 = 25; + +const RPC_CONCURRENT_REQUESTS: usize = 20; lazy_static! { // mutex to restrict access to faucet wallet from concurrent tests @@ -133,6 +138,27 @@ pub fn random_content( )) } +// Returns all the PeerId for all the locally running nodes +pub async fn get_all_peer_ids() -> Result> { + let mut all_peers = Vec::new(); + + let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + for node_index in 1..NODE_COUNT + 1 { + addr.set_port(12000 + node_index as u16); + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + + // get the peer_id + let response = rpc_client + .node_info(Request::new(NodeInfoRequest {})) + .await?; + let peer_id = PeerId::from_bytes(&response.get_ref().peer_id)?; + all_peers.push(peer_id); + } + println!("Obtained the PeerId list for the locally running network with a node count of {NODE_COUNT}"); + Ok(all_peers) +} + pub async fn node_restart(addr: SocketAddr) -> Result<()> { let endpoint = format!("https://{addr}"); let mut client = SafeNodeClient::connect(endpoint).await?; diff --git a/sn_node/tests/data_with_churn.rs b/sn_node/tests/data_with_churn.rs index 2198ef5dec..805968add7 100644 --- a/sn_node/tests/data_with_churn.rs +++ b/sn_node/tests/data_with_churn.rs @@ -10,7 +10,7 @@ mod common; use assert_fs::TempDir; use common::{ - get_funded_wallet, get_gossip_client_and_wallet, get_wallet, node_restart, + get_funded_wallet, get_gossip_client_and_wallet, get_wallet, node_restart, NODE_COUNT, PAYING_WALLET_INITIAL_BALANCE, }; use eyre::{bail, eyre, Result}; @@ -38,8 +38,6 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep}; use tracing::{debug, trace}; use xor_name::XorName; -const NODE_COUNT: u32 = 25; - const EXTRA_CHURN_COUNT: u32 = 5; const CHURN_CYCLES: u32 = 1; const CHUNK_CREATION_RATIO_TO_CHURN: u32 = 15; diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 4c7d2f014c..be436ffe36 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -8,6 +8,7 @@ mod common; +use common::NODE_COUNT; use eyre::Result; use sn_logging::LogBuilder; use sn_node::NodeEvent; @@ -23,8 +24,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tonic::Request; -const NODE_COUNT: u8 = 25; -const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers +const NODES_SUBSCRIBED: u32 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers const TEST_CYCLES: u8 = 20; #[tokio::test] @@ -47,7 +47,7 @@ async fn msgs_over_gossipsub() -> Result<()> { // get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic let mut rng = rand::thread_rng(); let random_subs_nodes: Vec<_> = - rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into()) + rand::seq::index::sample(&mut rng, NODE_COUNT as usize, NODES_SUBSCRIBED as usize) .iter() .map(|i| all_nodes_addrs[i]) .collect(); @@ -65,7 +65,7 @@ async fn msgs_over_gossipsub() -> Result<()> { .node_events(Request::new(NodeEventsRequest {})) .await?; - let mut count = 0; + let mut count: u32 = 0; let _ = timeout(Duration::from_secs(40), async { let mut stream = response.into_inner(); @@ -87,7 +87,7 @@ async fn msgs_over_gossipsub() -> Result<()> { }) .await; - Ok::(count) + Ok::(count) }); subs_handles.push((node_index, addr, handle)); @@ -141,7 +141,7 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< } async fn other_nodes_to_publish_on_topic( - nodes: Vec<(u8, SocketAddr)>, + nodes: Vec<(u32, SocketAddr)>, topic: String, ) -> Result<()> { for (node_index, addr) in nodes { diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index ddd21d3eb2..264770c7d3 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -9,7 +9,10 @@ #![allow(clippy::mutable_key_type)] mod common; -use crate::common::{get_gossip_client_and_wallet, node_restart, PAYING_WALLET_INITIAL_BALANCE}; +use crate::common::{ + get_all_peer_ids, get_gossip_client_and_wallet, node_restart, NODE_COUNT, + PAYING_WALLET_INITIAL_BALANCE, +}; use assert_fs::TempDir; use eyre::{eyre, Result}; use libp2p::{ @@ -35,7 +38,6 @@ use std::{ use tonic::Request; use tracing::error; -const NODE_COUNT: u8 = 25; const CHUNK_SIZE: usize = 1024; // VERIFICATION_DELAY is set based on the dead peer detection interval @@ -57,7 +59,7 @@ const CHURN_COUNT: u8 = 4; // It can be overridden by setting the 'CHUNK_COUNT' env var. const CHUNK_COUNT: usize = 5; -type NodeIndex = u8; +type NodeIndex = u32; type RecordHolders = HashMap>; #[tokio::test(flavor = "multi_thread")] @@ -276,27 +278,6 @@ async fn verify_location(all_peers: &[PeerId]) -> Result<()> { } } -// Returns all the PeerId for all the locally running nodes -async fn get_all_peer_ids() -> Result> { - let mut all_peers = Vec::new(); - - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - for node_index in 1..NODE_COUNT + 1 { - addr.set_port(12000 + node_index as u16); - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - - // get the peer_id - let response = rpc_client - .node_info(Request::new(NodeInfoRequest {})) - .await?; - let peer_id = PeerId::from_bytes(&response.get_ref().peer_id)?; - all_peers.push(peer_id); - } - println!("Obtained the PeerId list for the locally running network with a node count of {NODE_COUNT}"); - Ok(all_peers) -} - // Generate random Chunks and store them to the Network async fn store_chunks(client: Client, chunk_count: usize, wallet_dir: PathBuf) -> Result<()> { let start = Instant::now(); diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs new file mode 100644 index 0000000000..892090d964 --- /dev/null +++ b/sn_node/tests/verify_routing_table.rs @@ -0,0 +1,96 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +#![allow(clippy::mutable_key_type)] +mod common; + +use crate::common::{get_all_peer_ids, NODE_COUNT}; +use color_eyre::Result; +use libp2p::{ + kad::{KBucketKey, K_VALUE}, + PeerId, +}; +use sn_logging::LogBuilder; +use sn_protocol::safenode_proto::{safe_node_client::SafeNodeClient, KBucketsRequest}; +use std::{ + collections::{HashMap, HashSet}, + net::{IpAddr, Ipv4Addr, SocketAddr}, +}; +use tonic::Request; + +#[tokio::test(flavor = "multi_thread")] +async fn verify_routing_tables() -> Result<()> { + let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("routing_table"); + + let all_peers = get_all_peer_ids().await?; + let mut all_failed_list = HashMap::new(); + + for node_index in 1..NODE_COUNT + 1 { + let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + addr.set_port(12000 + node_index as u16); + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + + let response = rpc_client + .k_buckets(Request::new(KBucketsRequest {})) + .await?; + + let k_buckets = response.get_ref().kbuckets.clone(); + let k_buckets = k_buckets + .into_iter() + .map(|(ilog2, peers)| { + let peers = peers + .peers + .clone() + .into_iter() + .map(|peer_bytes| PeerId::from_bytes(&peer_bytes).unwrap()) + .collect::>(); + (ilog2, peers) + }) + .collect::>(); + + let current_peer = *all_peers + .get(node_index as usize - 1) + .unwrap_or_else(|| panic!("Node should be present at index {}", node_index - 1)); + let current_peer_key = KBucketKey::from(current_peer); + + let mut failed_list = Vec::new(); + for peer in all_peers.iter() { + let ilog2_distance = match KBucketKey::from(*peer).distance(¤t_peer_key).ilog2() { + Some(distance) => distance, + // None if same key + None => continue, + }; + match k_buckets.get(&ilog2_distance) { + Some(bucket) => { + if bucket.contains(peer) { + continue; + } else if bucket.len() == K_VALUE.get() { + println!("{peer:?} should be inside the ilog2 bucket: {ilog2_distance:?} of {current_peer:?}. But skipped as the bucket is full"); + continue; + } else { + println!("{peer:?} not found inside the kbucket with ilog2 {ilog2_distance:?} of {current_peer:?} RT"); + failed_list.push(*peer); + } + } + None => { + println!("Current peer {current_peer:?} should be {ilog2_distance} ilog2 distance away from {peer:?}, but that kbucket is not present for current_peer."); + failed_list.push(*peer); + } + } + } + if !failed_list.is_empty() { + all_failed_list.insert(current_peer, failed_list); + } + } + if !all_failed_list.is_empty() { + println!("Failed to verify routing table:\n{all_failed_list:?}"); + panic!("Failed to verify routing table"); + } + Ok(()) +} From 01b168eac777ef906d08d85bcfbfbb6538ef3a90 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 23 Nov 2023 22:39:45 +0530 Subject: [PATCH 03/12] chore(ci): enable routing table test --- .github/workflows/merge.yml | 16 ++++++++++++---- .github/workflows/nightly.yml | 16 ++++++++++++---- sn_node/tests/verify_routing_table.rs | 4 ++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 19d9fc8329..104e94c477 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -438,9 +438,9 @@ jobs: log_file_prefix: safe_test_logs_churn platform: ${{ matrix.os }} - verify_data_location: + verify_data_location_routing_table: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" - name: Verify data location + name: Verify data location and Routing Table runs-on: ${{ matrix.os }} strategy: matrix: @@ -464,8 +464,8 @@ jobs: run: cargo build --release --features local-discovery --bin safenode --bin faucet timeout-minutes: 30 - - name: Build data location test - run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run + - name: Build data location and routing table tests + run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a local network @@ -487,6 +487,10 @@ jobs: echo "SAFE_PEERS has been set to $SAFE_PEERS" fi + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify the location of the data on the network (4 * 5 mins) run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture env: @@ -494,6 +498,10 @@ jobs: SN_LOG: "all" timeout-minutes: 30 + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify restart of nodes using rg shell: bash timeout-minutes: 1 diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index b8457bb10c..6ded81ef27 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -383,8 +383,8 @@ jobs: SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}" SLACK_TITLE: "Nightly Churn Test Run Failed" - verify_data_location: - name: Verify data location + verify_data_location_routing_table: + name: Verify data location and Routing Table runs-on: ${{ matrix.os }} strategy: matrix: @@ -410,8 +410,8 @@ jobs: run: cargo build --release --features local-discovery --bin safenode --bin faucet timeout-minutes: 30 - - name: Build data location test - run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run + - name: Build data location and routing table tests + run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a local network @@ -423,12 +423,20 @@ jobs: faucet-path: target/release/faucet platform: ${{ matrix.os }} + - name: Verify the Routing table of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify the location of the data on the network (approx 12 * 5 mins) run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture env: CHURN_COUNT: 12 SN_LOG: "all" timeout-minutes: 90 + + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture + timeout-minutes: 5 - name: Verify restart of nodes using rg shell: bash diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index 892090d964..bdbb212111 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -24,8 +24,8 @@ use std::{ use tonic::Request; #[tokio::test(flavor = "multi_thread")] -async fn verify_routing_tables() -> Result<()> { - let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("routing_table"); +async fn verify_routing_table() -> Result<()> { + let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("verify_routing_table"); let all_peers = get_all_peer_ids().await?; let mut all_failed_list = HashMap::new(); From acaa862d92b271c1e91155b7e1b6479bf01797f9 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 15:48:41 +0530 Subject: [PATCH 04/12] chore(ci): enable routing table test during memcheck --- .github/workflows/memcheck.yml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 130dd61a69..72e3b9538f 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -39,8 +39,8 @@ jobs: run: cargo build --release --bins timeout-minutes: 30 - - name: Build churn tests - run: cargo test --release -p sn_node --test data_with_churn --no-run + - name: Build tests + run: cargo test --release -p sn_node --test data_with_churn --test verify_routing_table --no-run timeout-minutes: 30 - name: Start a node instance that does not undergo churn @@ -79,6 +79,10 @@ jobs: shell: bash run: echo "The SAFE_PEERS variable has been set to ${SAFE_PEERS}" + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Create and fund a wallet to pay for files storage run: | cargo run --bin faucet --release -- --log-output-dest=data-dir send 5000000 $(cargo run --bin safe --release -- --log-output-dest=data-dir wallet address | tail -n 1) > initial_balance_from_faucet.txt @@ -112,6 +116,10 @@ jobs: SN_LOG: "all" timeout-minutes: 30 + - name: Verify the routing tables of the nodes + run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture + timeout-minutes: 5 + - name: Verify restart of nodes using rg shell: bash timeout-minutes: 1 From 12b75c2cb1738c19736513e90845c92f065f5457 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 16:12:08 +0530 Subject: [PATCH 05/12] fix(test): sleep before verifying routing table --- .github/workflows/memcheck.yml | 8 ++++++-- sn_node/tests/verify_routing_table.rs | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 72e3b9538f..aefb6a1428 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -81,7 +81,9 @@ jobs: - name: Verify the routing tables of the nodes run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture - timeout-minutes: 5 + env: + SLEEP_BEFORE_VERIFICATION: 120 + timeout-minutes: 10 - name: Create and fund a wallet to pay for files storage run: | @@ -118,7 +120,9 @@ jobs: - name: Verify the routing tables of the nodes run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture - timeout-minutes: 5 + env: + SLEEP_BEFORE_VERIFICATION: 120 + timeout-minutes: 10 - name: Verify restart of nodes using rg shell: bash diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index bdbb212111..a1eba12046 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -20,13 +20,29 @@ use sn_protocol::safenode_proto::{safe_node_client::SafeNodeClient, KBucketsRequ use std::{ collections::{HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, }; use tonic::Request; +/// Sleep for sometime for the nodes for discover each other before verification +/// Also can be set through the env variable of the same name. +const SLEEP_BEFORE_VERIFICATION: Duration = Duration::from_secs(5); + #[tokio::test(flavor = "multi_thread")] async fn verify_routing_table() -> Result<()> { let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("verify_routing_table"); + let sleep_duration = std::env::var("SLEEP_BEFORE_VERIFICATION") + .map(|value| { + value + .parse::() + .expect("Failed to prase sleep value into u64") + }) + .map(Duration::from_secs) + .unwrap_or(SLEEP_BEFORE_VERIFICATION); + println!("Sleeping for {sleep_duration:?} before verification"); + tokio::time::sleep(sleep_duration).await; + let all_peers = get_all_peer_ids().await?; let mut all_failed_list = HashMap::new(); From 7c50c357602bbbb4070b53318d2ebcf0e36bfaf5 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 19:28:50 +0530 Subject: [PATCH 06/12] feat(discovery): try to use random candidates from a bucket when available --- Cargo.lock | 1 + sn_networking/Cargo.toml | 1 + sn_networking/src/bootstrap.rs | 6 +- sn_networking/src/driver.rs | 53 +----------- sn_networking/src/lib.rs | 1 + sn_networking/src/network_discovery.rs | 108 +++++++++++++++++++++++++ 6 files changed, 120 insertions(+), 50 deletions(-) create mode 100644 sn_networking/src/network_discovery.rs diff --git a/Cargo.lock b/Cargo.lock index 84d7878deb..e029752b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4653,6 +4653,7 @@ dependencies = [ "prometheus-client 0.22.0", "quickcheck", "rand", + "rayon", "rmp-serde", "serde", "sn_protocol", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 3119acd6d8..b5ddbd81e7 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -26,6 +26,7 @@ custom_debug = "~0.5.0" libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] } prometheus-client = { version = "0.22", optional = true } rand = { version = "~0.8.5", features = ["small_rng"] } +rayon = "1.8.0" rmp-serde = "1.1.1" serde = { version = "1.0.133", features = [ "derive", "rc" ]} sn_protocol = { path = "../sn_protocol", version = "0.8.29" } diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 430919ddb2..5643eb9bc6 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -49,16 +49,20 @@ impl SwarmDriver { } pub(crate) fn trigger_network_discovery(&mut self) { + let now = Instant::now(); // The query is just to trigger the network discovery, // hence no need to wait for a result. - for addr in &self.network_discovery_candidates { + for addr in self.network_discovery_candidates.candidates() { let _ = self .swarm .behaviour_mut() .kademlia .get_closest_peers(addr.as_bytes()); } + self.network_discovery_candidates + .try_generate_new_candidates(); self.bootstrap.initiated(); + debug!("Trigger network discovery took {:?}", now.elapsed()); } } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 63dc850998..d23d6743f3 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -18,6 +18,7 @@ use crate::{ event::NetworkEvent, event::{GetRecordResultMap, NodeEvent}, multiaddr_pop_p2p, + network_discovery::NetworkDiscoveryCandidates, record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig}, record_store_api::UnifiedRecordStore, replication_fetcher::ReplicationFetcher, @@ -50,7 +51,7 @@ use sn_protocol::{ NetworkAddress, PrettyPrintKBucketKey, }; use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, net::SocketAddr, num::NonZeroUsize, path::PathBuf, @@ -495,7 +496,7 @@ impl NetworkBuilder { // `identify` protocol to kick in and get them in the routing table. dialed_peers: CircularVec::new(63), is_gossip_handler: false, - network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id), + network_discovery_candidates: NetworkDiscoveryCandidates::new(&peer_id), }; Ok(( @@ -511,51 +512,6 @@ impl NetworkBuilder { } } -fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec { - let mut candidates: BTreeMap = BTreeMap::new(); - // To avoid deadlock or taking too much time, currently set a fixed generation attempts - let mut attempts = 0; - // Also an early return when got the first 20 furthest kBuckets covered. - let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect(); - - let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key(); - let local_key_bytes_len = local_key.hashed_bytes().len(); - while attempts < 10000 && !buckets_covered.is_empty() { - let candiate = NetworkAddress::from_peer(PeerId::random()); - let candidate_key = candiate.as_kbucket_key(); - let candidate_key_bytes_len = candidate_key.hashed_bytes().len(); - - if local_key_bytes_len != candidate_key_bytes_len { - panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}"); - } - - let common_leading_bits = - common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes()); - - let _ = candidates.insert(common_leading_bits, candiate); - let _ = buckets_covered.remove(&common_leading_bits); - - attempts += 1; - } - - let generated_buckets: Vec<_> = candidates.keys().copied().collect(); - let generated_candidates: Vec<_> = candidates.values().cloned().collect(); - trace!("Generated targets covering kbuckets {generated_buckets:?}"); - generated_candidates -} - -/// Returns the length of the common leading bits. -/// e.g. when `11110000` and `11111111`, return as 4. -/// Note: the length of two shall be the same -fn common_leading_bits(one: &[u8], two: &[u8]) -> usize { - for byte_index in 0..one.len() { - if one[byte_index] != two[byte_index] { - return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize; - } - } - 8 * one.len() -} - pub struct SwarmDriver { pub(crate) swarm: Swarm, pub(crate) self_peer_id: PeerId, @@ -584,9 +540,8 @@ pub struct SwarmDriver { // they are not supposed to process the gossip msg that received from libp2p. pub(crate) is_gossip_handler: bool, // A list of random `PeerId` candidates that falls into kbuckets, - // one for each furthest 30 kbuckets. // This is to ensure a more accurate network discovery. - pub(crate) network_discovery_candidates: Vec, + pub(crate) network_discovery_candidates: NetworkDiscoveryCandidates, } impl SwarmDriver { diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 612d281a40..06cede0229 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -19,6 +19,7 @@ mod event; mod metrics; #[cfg(feature = "open-metrics")] mod metrics_service; +mod network_discovery; mod quorum; mod record_store; mod record_store_api; diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs new file mode 100644 index 0000000000..1bf7369238 --- /dev/null +++ b/sn_networking/src/network_discovery.rs @@ -0,0 +1,108 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use libp2p::{kad::KBucketKey, PeerId}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use sn_protocol::NetworkAddress; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + time::Instant, +}; + +const INITIAL_GENERATION_ATTEMPTS: usize = 10_000; +const GENERATION_ATTEMPTS: usize = 1_000; +const MAX_PEERS_PER_BUCKET: usize = 5; + +#[derive(Debug, Clone)] +pub(crate) struct NetworkDiscoveryCandidates { + self_key: KBucketKey, + candidates: HashMap>, +} + +impl NetworkDiscoveryCandidates { + pub(crate) fn new(self_peer_id: &PeerId) -> Self { + let start = Instant::now(); + let self_key = KBucketKey::from(*self_peer_id); + let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); + + let mut candidates: HashMap> = HashMap::new(); + for (ilog2, candidate) in candidates_vec { + match candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + if entry.len() >= MAX_PEERS_PER_BUCKET { + continue; + } else { + entry.push_back(candidate); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(VecDeque::from([candidate])); + } + } + } + + info!( + "Time to generate NetworkDiscoveryCandidates: {:?}", + start.elapsed() + ); + let mut buckets_covered = candidates + .iter() + .map(|(ilog2, candidates)| (*ilog2, candidates.len())) + .collect::>(); + buckets_covered.sort_by_key(|(ilog2, _)| *ilog2); + info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}"); + + Self { + self_key, + candidates, + } + } + + pub(crate) fn try_generate_new_candidates(&mut self) { + let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); + for (ilog2, candidate) in candidates_vec { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + if entry.len() >= MAX_PEERS_PER_BUCKET { + // pop the front (as it might have been already used for querying and insert the new one at the back + let _ = entry.pop_front(); + entry.push_back(candidate); + } else { + entry.push_back(candidate); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(VecDeque::from([candidate])); + } + } + } + } + + pub(crate) fn candidates(&self) -> impl Iterator { + self.candidates + .values() + .filter_map(|candidates| candidates.front()) + } + + fn generate_candidates( + self_key: &KBucketKey, + num_to_generate: usize, + ) -> Vec<(u32, NetworkAddress)> { + (0..num_to_generate) + .into_par_iter() + .filter_map(|_| { + let candidate = NetworkAddress::from_peer(PeerId::random()); + let candidate_key = candidate.as_kbucket_key(); + let ilog2_distance = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2_distance, candidate)) + }) + .collect::>() + } +} From abf497676ca8fe26ac051de8649b43a89d395cff Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 22:30:53 +0530 Subject: [PATCH 07/12] feat(discovery): use the results of the get_closest_query --- sn_networking/src/bootstrap.rs | 8 ++++-- sn_networking/src/cmd.rs | 12 ++++++--- sn_networking/src/driver.rs | 10 +++++++- sn_networking/src/event.rs | 34 ++++++++++++++++++-------- sn_networking/src/network_discovery.rs | 32 +++++++++++++++++++++++- 5 files changed, 78 insertions(+), 18 deletions(-) diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 5643eb9bc6..148e3c76ff 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -6,7 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::SwarmDriver; +use crate::{driver::PendingGetClosestType, SwarmDriver}; use std::time::{Duration, Instant}; use tokio::time::Interval; @@ -53,11 +53,15 @@ impl SwarmDriver { // The query is just to trigger the network discovery, // hence no need to wait for a result. for addr in self.network_discovery_candidates.candidates() { - let _ = self + let query_id = self .swarm .behaviour_mut() .kademlia .get_closest_peers(addr.as_bytes()); + let _ = self.pending_get_closest_peers.insert( + query_id, + (PendingGetClosestType::NetworkDiscovery, Default::default()), + ); } self.network_discovery_candidates .try_generate_new_candidates(); diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index ce050f738c..f21c245887 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - driver::SwarmDriver, + driver::{PendingGetClosestType, SwarmDriver}, error::{Error, Result}, sort_peers_by_address, GetQuorum, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE, REPLICATE_RANGE, @@ -512,9 +512,13 @@ impl SwarmDriver { .behaviour_mut() .kademlia .get_closest_peers(key.as_bytes()); - let _ = self - .pending_get_closest_peers - .insert(query_id, (sender, Default::default())); + let _ = self.pending_get_closest_peers.insert( + query_id, + ( + PendingGetClosestType::FunctionCall(sender), + Default::default(), + ), + ); } SwarmCmd::GetAllLocalPeers { sender } => { let _ = sender.send(self.get_all_local_peers()); diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index d23d6743f3..a5c3811b01 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -64,7 +64,15 @@ use tracing::warn; /// List of expected record holders to be verified. pub(super) type ExpectedHoldersList = HashSet; -type PendingGetClosest = HashMap>, HashSet)>; +/// The ways in which the Get Closest queries are used. +pub(crate) enum PendingGetClosestType { + /// The network discovery method is present at the networking layer + /// Thus we can just process the queries made by NetworkDiscovery without using any channels + NetworkDiscovery, + /// These are queries made by a function at the upper layers and contains a channel to send the result back. + FunctionCall(oneshot::Sender>), +} +type PendingGetClosest = HashMap)>; type PendingGetRecord = HashMap< QueryId, ( diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index ce921f8570..e2306ee1e6 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -8,7 +8,7 @@ use crate::{ close_group_majority, - driver::{truncate_patch_version, SwarmDriver}, + driver::{truncate_patch_version, PendingGetClosestType, SwarmDriver}, error::{Error, Result}, multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, GetQuorum, CLOSE_GROUP_SIZE, }; @@ -606,7 +606,7 @@ impl SwarmDriver { "Query task {id:?} returned with peers {closest_peers:?}, {stats:?} - {step:?}" ); - let (sender, mut current_closest) = + let (get_closest_type, mut current_closest) = self.pending_get_closest_peers.remove(&id).ok_or_else(|| { trace!( "Can't locate query task {id:?}, it has likely been completed already." @@ -621,13 +621,20 @@ impl SwarmDriver { let new_peers: HashSet = closest_peers.peers.clone().into_iter().collect(); current_closest.extend(new_peers); if current_closest.len() >= usize::from(K_VALUE) || step.last { - sender - .send(current_closest) - .map_err(|_| Error::InternalMsgChannelDropped)?; + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery_candidates + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| Error::InternalMsgChannelDropped)?; + } + } } else { let _ = self .pending_get_closest_peers - .insert(id, (sender, current_closest)); + .insert(id, (get_closest_type, current_closest)); } } // Handle GetClosestPeers timeouts @@ -640,7 +647,7 @@ impl SwarmDriver { event_string = "kad_event::get_closest_peers_err"; error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}"); - let (sender, mut current_closest) = + let (get_closest_type, mut current_closest) = self.pending_get_closest_peers.remove(&id).ok_or_else(|| { trace!( "Can't locate query task {id:?}, it has likely been completed already." @@ -657,9 +664,16 @@ impl SwarmDriver { } } - sender - .send(current_closest) - .map_err(|_| Error::InternalMsgChannelDropped)?; + match get_closest_type { + PendingGetClosestType::NetworkDiscovery => self + .network_discovery_candidates + .handle_get_closest_query(current_closest), + PendingGetClosestType::FunctionCall(sender) => { + sender + .send(current_closest) + .map_err(|_| Error::InternalMsgChannelDropped)?; + } + } } // For `get_record` returning behaviour: diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 1bf7369238..93781470f7 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -10,7 +10,7 @@ use libp2p::{kad::KBucketKey, PeerId}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, time::Instant, }; @@ -91,6 +91,36 @@ impl NetworkDiscoveryCandidates { .filter_map(|candidates| candidates.front()) } + pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { + let now = Instant::now(); + for peer in closest_peers { + let peer = NetworkAddress::from_peer(peer); + let peer_key = peer.as_kbucket_key(); + if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() { + match self.candidates.entry(ilog2_distance) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + // extra check to make sure we don't insert the same peer again + if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) { + // pop the front (as it might have been already used for querying and insert the new one at the back + let _ = entry.pop_front(); + entry.push_back(peer); + } else { + entry.push_back(peer); + } + } + Entry::Vacant(entry) => { + let _ = entry.insert(VecDeque::from([peer])); + } + } + } + } + trace!( + "It took {:?} to NetworkDiscovery::handle get closest query", + now.elapsed() + ); + } + fn generate_candidates( self_key: &KBucketKey, num_to_generate: usize, From 21b6e5846527a48dec9d291ee92c39e3e240c946 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 22:41:03 +0530 Subject: [PATCH 08/12] chore(discovery): rename structs and add docs --- sn_networking/src/bootstrap.rs | 6 +++--- sn_networking/src/driver.rs | 6 +++--- sn_networking/src/event.rs | 4 ++-- sn_networking/src/network_discovery.rs | 19 ++++++++++++++++--- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 148e3c76ff..38ea496ec5 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -52,7 +52,7 @@ impl SwarmDriver { let now = Instant::now(); // The query is just to trigger the network discovery, // hence no need to wait for a result. - for addr in self.network_discovery_candidates.candidates() { + for addr in self.network_discovery.candidates() { let query_id = self .swarm .behaviour_mut() @@ -63,8 +63,8 @@ impl SwarmDriver { (PendingGetClosestType::NetworkDiscovery, Default::default()), ); } - self.network_discovery_candidates - .try_generate_new_candidates(); + // Refresh the candidate list to not query the same candidates over and over again. + self.network_discovery.try_refresh_candidates(); self.bootstrap.initiated(); debug!("Trigger network discovery took {:?}", now.elapsed()); } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index a5c3811b01..e640fbfb7c 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -18,7 +18,7 @@ use crate::{ event::NetworkEvent, event::{GetRecordResultMap, NodeEvent}, multiaddr_pop_p2p, - network_discovery::NetworkDiscoveryCandidates, + network_discovery::NetworkDiscovery, record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig}, record_store_api::UnifiedRecordStore, replication_fetcher::ReplicationFetcher, @@ -504,7 +504,7 @@ impl NetworkBuilder { // `identify` protocol to kick in and get them in the routing table. dialed_peers: CircularVec::new(63), is_gossip_handler: false, - network_discovery_candidates: NetworkDiscoveryCandidates::new(&peer_id), + network_discovery: NetworkDiscovery::new(&peer_id), }; Ok(( @@ -549,7 +549,7 @@ pub struct SwarmDriver { pub(crate) is_gossip_handler: bool, // A list of random `PeerId` candidates that falls into kbuckets, // This is to ensure a more accurate network discovery. - pub(crate) network_discovery_candidates: NetworkDiscoveryCandidates, + pub(crate) network_discovery: NetworkDiscovery, } impl SwarmDriver { diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index e2306ee1e6..ceee999a27 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -623,7 +623,7 @@ impl SwarmDriver { if current_closest.len() >= usize::from(K_VALUE) || step.last { match get_closest_type { PendingGetClosestType::NetworkDiscovery => self - .network_discovery_candidates + .network_discovery .handle_get_closest_query(current_closest), PendingGetClosestType::FunctionCall(sender) => { sender @@ -666,7 +666,7 @@ impl SwarmDriver { match get_closest_type { PendingGetClosestType::NetworkDiscovery => self - .network_discovery_candidates + .network_discovery .handle_get_closest_query(current_closest), PendingGetClosestType::FunctionCall(sender) => { sender diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 93781470f7..1ee3bc62d6 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -14,17 +14,23 @@ use std::{ time::Instant, }; +// The number of PeerId to generate when starting an instance of NetworkDiscovery const INITIAL_GENERATION_ATTEMPTS: usize = 10_000; +// The number of PeerId to generate during each invocation to refresh our candidates const GENERATION_ATTEMPTS: usize = 1_000; +// The max number of PeerId to keep per bucket const MAX_PEERS_PER_BUCKET: usize = 5; +/// Keep track of NetworkAddresses belonging to every bucket (if we can generate them with reasonable effort) +/// which we can then query using Kad::GetClosestPeers to effectively fill our RT. #[derive(Debug, Clone)] -pub(crate) struct NetworkDiscoveryCandidates { +pub(crate) struct NetworkDiscovery { self_key: KBucketKey, candidates: HashMap>, } -impl NetworkDiscoveryCandidates { +impl NetworkDiscovery { + /// Create a new instance of NetworkDiscovery and tries to populate each bucket with random peers. pub(crate) fn new(self_peer_id: &PeerId) -> Self { let start = Instant::now(); let self_key = KBucketKey::from(*self_peer_id); @@ -64,7 +70,10 @@ impl NetworkDiscoveryCandidates { } } - pub(crate) fn try_generate_new_candidates(&mut self) { + /// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the + /// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate + /// is inserted at the last + pub(crate) fn try_refresh_candidates(&mut self) { let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); for (ilog2, candidate) in candidates_vec { match self.candidates.entry(ilog2) { @@ -85,12 +94,15 @@ impl NetworkDiscoveryCandidates { } } + /// Returns one candidate per bucket + /// Todo: Limit the candidates to return. Favor the closest buckets. pub(crate) fn candidates(&self) -> impl Iterator { self.candidates .values() .filter_map(|candidates| candidates.front()) } + /// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full. pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { let now = Instant::now(); for peer in closest_peers { @@ -121,6 +133,7 @@ impl NetworkDiscoveryCandidates { ); } + /// Uses rayon to parallelize the generation fn generate_candidates( self_key: &KBucketKey, num_to_generate: usize, From aec3ede9838d27343d01d39a00e86cf7f4200e74 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 24 Nov 2023 23:00:21 +0530 Subject: [PATCH 09/12] chore(ci): perform routing table verification after churn test --- .github/workflows/memcheck.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index aefb6a1428..255b5be582 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -79,12 +79,6 @@ jobs: shell: bash run: echo "The SAFE_PEERS variable has been set to ${SAFE_PEERS}" - - name: Verify the routing tables of the nodes - run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture - env: - SLEEP_BEFORE_VERIFICATION: 120 - timeout-minutes: 10 - - name: Create and fund a wallet to pay for files storage run: | cargo run --bin faucet --release -- --log-output-dest=data-dir send 5000000 $(cargo run --bin safe --release -- --log-output-dest=data-dir wallet address | tail -n 1) > initial_balance_from_faucet.txt @@ -121,7 +115,7 @@ jobs: - name: Verify the routing tables of the nodes run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture env: - SLEEP_BEFORE_VERIFICATION: 120 + SLEEP_BEFORE_VERIFICATION: 220 timeout-minutes: 10 - name: Verify restart of nodes using rg From cddcb7fa01f5336764a9e6f1d037eff3f428b5a5 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 27 Nov 2023 00:06:59 +0530 Subject: [PATCH 10/12] fix(discovery): insert newly seen candidates and return random candidates --- sn_networking/src/network_discovery.rs | 169 ++++++++++++++----------- sn_node/tests/verify_routing_table.rs | 4 +- 2 files changed, 97 insertions(+), 76 deletions(-) diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 1ee3bc62d6..f2dea4a80d 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -7,10 +7,11 @@ // permissions and limitations relating to use of the SAFE Network Software. use libp2p::{kad::KBucketKey, PeerId}; +use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{hash_map::Entry, HashMap, HashSet}, time::Instant, }; @@ -26,7 +27,7 @@ const MAX_PEERS_PER_BUCKET: usize = 5; #[derive(Debug, Clone)] pub(crate) struct NetworkDiscovery { self_key: KBucketKey, - candidates: HashMap>, + candidates: HashMap>, } impl NetworkDiscovery { @@ -34,24 +35,7 @@ impl NetworkDiscovery { pub(crate) fn new(self_peer_id: &PeerId) -> Self { let start = Instant::now(); let self_key = KBucketKey::from(*self_peer_id); - let candidates_vec = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); - - let mut candidates: HashMap> = HashMap::new(); - for (ilog2, candidate) in candidates_vec { - match candidates.entry(ilog2) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - if entry.len() >= MAX_PEERS_PER_BUCKET { - continue; - } else { - entry.push_back(candidate); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); - } - } - } + let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS); info!( "Time to generate NetworkDiscoveryCandidates: {:?}", @@ -70,82 +54,121 @@ impl NetworkDiscovery { } } - /// Tries to refresh our current candidate list. The candidates at the front of the list are used when querying the - /// network, so if a new peer for that bucket is generated, the first candidate is removed and the new candidate - /// is inserted at the last + /// Tries to refresh our current candidate list. We replace the old ones with new if we find any. pub(crate) fn try_refresh_candidates(&mut self) { let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); - for (ilog2, candidate) in candidates_vec { - match self.candidates.entry(ilog2) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - if entry.len() >= MAX_PEERS_PER_BUCKET { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(candidate); - } else { - entry.push_back(candidate); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([candidate])); - } - } + for (ilog2, candidates) in candidates_vec { + self.insert_candidates(ilog2, candidates); } } - /// Returns one candidate per bucket - /// Todo: Limit the candidates to return. Favor the closest buckets. - pub(crate) fn candidates(&self) -> impl Iterator { - self.candidates - .values() - .filter_map(|candidates| candidates.front()) - } - - /// The result from the kad::GetClosestPeers are again used to update our kbuckets if they're not full. + /// The result from the kad::GetClosestPeers are again used to update our kbucket. pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { let now = Instant::now(); - for peer in closest_peers { - let peer = NetworkAddress::from_peer(peer); - let peer_key = peer.as_kbucket_key(); - if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() { - match self.candidates.entry(ilog2_distance) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - // extra check to make sure we don't insert the same peer again - if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) { - // pop the front (as it might have been already used for querying and insert the new one at the back - let _ = entry.pop_front(); - entry.push_back(peer); - } else { - entry.push_back(peer); - } - } - Entry::Vacant(entry) => { - let _ = entry.insert(VecDeque::from([peer])); - } - } - } + + let candidates_map: HashMap> = closest_peers + .into_iter() + .filter_map(|peer| { + let peer = NetworkAddress::from_peer(peer); + let peer_key = peer.as_kbucket_key(); + peer_key + .distance(&self.self_key) + .ilog2() + .map(|ilog2| (ilog2, peer)) + }) + // To collect the NetworkAddresses into a vector. + .fold(HashMap::new(), |mut acc, (ilog2, peer)| { + acc.entry(ilog2).or_default().push(peer); + acc + }); + + for (ilog2, candidates) in candidates_map { + self.insert_candidates(ilog2, candidates); } + trace!( "It took {:?} to NetworkDiscovery::handle get closest query", now.elapsed() ); } + /// Returns one random candidate per bucket + /// Todo: Limit the candidates to return. Favor the closest buckets. + pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> { + let mut rng = thread_rng(); + let mut op = Vec::with_capacity(self.candidates.len()); + + let candidates = self.candidates.values().filter_map(|candidates| { + // get a random index each time + let random_index = rng.gen::() % candidates.len(); + candidates.get(random_index) + }); + op.extend(candidates); + op + } + + // Insert the new candidates and remove the old ones to maintain MAX_PEERS_PER_BUCKET. + fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec) { + match self.candidates.entry(ilog2) { + Entry::Occupied(mut entry) => { + let existing_candidates = entry.get_mut(); + // insert only newly seen new_candidates + let new_candidates: Vec<_> = new_candidates + .into_iter() + .filter(|candidate| !existing_candidates.contains(candidate)) + .collect(); + existing_candidates.extend(new_candidates); + // Keep only the last MAX_PEERS_PER_BUCKET elements i.e., the newest ones + let excess = existing_candidates + .len() + .saturating_sub(MAX_PEERS_PER_BUCKET); + if excess > 0 { + existing_candidates.drain(..excess); + } + } + Entry::Vacant(entry) => { + entry.insert(new_candidates); + } + } + } + /// Uses rayon to parallelize the generation fn generate_candidates( self_key: &KBucketKey, num_to_generate: usize, - ) -> Vec<(u32, NetworkAddress)> { + ) -> HashMap> { (0..num_to_generate) .into_par_iter() .filter_map(|_| { let candidate = NetworkAddress::from_peer(PeerId::random()); let candidate_key = candidate.as_kbucket_key(); - let ilog2_distance = candidate_key.distance(&self_key).ilog2()?; - Some((ilog2_distance, candidate)) + let ilog2 = candidate_key.distance(&self_key).ilog2()?; + Some((ilog2, candidate)) }) - .collect::>() + // Since it is parallel iterator, the fold fn batches the items and will produce multiple outputs. So we + // should use reduce fn to combine multiple outputs. + .fold( + HashMap::new, + |mut acc: HashMap>, (ilog2, candidate)| { + acc.entry(ilog2).or_default().push(candidate); + acc + }, + ) + .reduce( + HashMap::new, + |mut acc: HashMap>, map| { + for (ilog2, candidates) in map { + let entry = acc.entry(ilog2).or_default(); + for candidate in candidates { + if entry.len() < MAX_PEERS_PER_BUCKET { + entry.push(candidate); + } else { + break; + } + } + } + acc + }, + ) } } diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index a1eba12046..bb5f83d33d 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -70,9 +70,7 @@ async fn verify_routing_table() -> Result<()> { }) .collect::>(); - let current_peer = *all_peers - .get(node_index as usize - 1) - .unwrap_or_else(|| panic!("Node should be present at index {}", node_index - 1)); + let current_peer = all_peers[node_index as usize - 1]; let current_peer_key = KBucketKey::from(current_peer); let mut failed_list = Vec::new(); From b25ea130a0e069e97387c60898229af865944aa1 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 27 Nov 2023 17:40:51 +0530 Subject: [PATCH 11/12] chore: changes based on comment, use btreemap --- sn_networking/src/bootstrap.rs | 8 ++--- sn_networking/src/cmd.rs | 9 +++--- sn_networking/src/lib.rs | 7 +++-- sn_networking/src/network_discovery.rs | 43 +++++++++++++------------- sn_node/src/lib.rs | 7 +++-- sn_node/tests/verify_routing_table.rs | 6 ++-- 6 files changed, 42 insertions(+), 38 deletions(-) diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 38ea496ec5..7bb376012c 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -50,9 +50,10 @@ impl SwarmDriver { pub(crate) fn trigger_network_discovery(&mut self) { let now = Instant::now(); - // The query is just to trigger the network discovery, - // hence no need to wait for a result. + // Fetches the candidates and also generates new candidates for addr in self.network_discovery.candidates() { + // The query_id is tracked here. This is to update the candidate list of network_discovery with the newly + // found closest peers. It may fill up the candidate list of closer buckets which are harder to generate. let query_id = self .swarm .behaviour_mut() @@ -63,8 +64,7 @@ impl SwarmDriver { (PendingGetClosestType::NetworkDiscovery, Default::default()), ); } - // Refresh the candidate list to not query the same candidates over and over again. - self.network_discovery.try_refresh_candidates(); + self.bootstrap.initiated(); debug!("Trigger network discovery took {:?}", now.elapsed()); } diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index f21c245887..b870758bb6 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -25,7 +25,7 @@ use sn_protocol::{ }; use sn_transfers::NanoTokens; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, }; use tokio::sync::oneshot; @@ -53,9 +53,10 @@ pub enum SwarmCmd { GetAllLocalPeers { sender: oneshot::Sender>, }, - // Get the map of ilog2 distance of the Kbucket to the peers in that bucket + /// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. GetKBuckets { - sender: oneshot::Sender>>, + sender: oneshot::Sender>>, }, // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table. // And our PeerId as well. @@ -524,7 +525,7 @@ impl SwarmDriver { let _ = sender.send(self.get_all_local_peers()); } SwarmCmd::GetKBuckets { sender } => { - let mut ilog2_kbuckets = HashMap::new(); + let mut ilog2_kbuckets = BTreeMap::new(); for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { let range = kbucket.range(); if let Some(distance) = range.0.ilog2() { diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 06cede0229..2209e78bca 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -53,7 +53,7 @@ use sn_protocol::{ }; use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote}; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, path::PathBuf, }; use tokio::sync::{mpsc, oneshot}; @@ -184,9 +184,10 @@ impl Network { self.get_closest_peers(key, false).await } - /// Returns the map of ilog2 distance of the Kbucket to the peers in that bucket + /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. /// Does not include self - pub async fn get_kbuckets(&self) -> Result>> { + pub async fn get_kbuckets(&self) -> Result>> { let (sender, receiver) = oneshot::channel(); self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender })?; receiver diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index f2dea4a80d..b582da99b5 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -11,7 +11,7 @@ use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{btree_map::Entry, BTreeMap, HashSet}, time::Instant, }; @@ -27,7 +27,7 @@ const MAX_PEERS_PER_BUCKET: usize = 5; #[derive(Debug, Clone)] pub(crate) struct NetworkDiscovery { self_key: KBucketKey, - candidates: HashMap>, + candidates: BTreeMap>, } impl NetworkDiscovery { @@ -41,11 +41,10 @@ impl NetworkDiscovery { "Time to generate NetworkDiscoveryCandidates: {:?}", start.elapsed() ); - let mut buckets_covered = candidates + let buckets_covered = candidates .iter() .map(|(ilog2, candidates)| (*ilog2, candidates.len())) .collect::>(); - buckets_covered.sort_by_key(|(ilog2, _)| *ilog2); info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}"); Self { @@ -54,19 +53,11 @@ impl NetworkDiscovery { } } - /// Tries to refresh our current candidate list. We replace the old ones with new if we find any. - pub(crate) fn try_refresh_candidates(&mut self) { - let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); - for (ilog2, candidates) in candidates_vec { - self.insert_candidates(ilog2, candidates); - } - } - /// The result from the kad::GetClosestPeers are again used to update our kbucket. pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet) { let now = Instant::now(); - let candidates_map: HashMap> = closest_peers + let candidates_map: BTreeMap> = closest_peers .into_iter() .filter_map(|peer| { let peer = NetworkAddress::from_peer(peer); @@ -77,7 +68,7 @@ impl NetworkDiscovery { .map(|ilog2| (ilog2, peer)) }) // To collect the NetworkAddresses into a vector. - .fold(HashMap::new(), |mut acc, (ilog2, peer)| { + .fold(BTreeMap::new(), |mut acc, (ilog2, peer)| { acc.entry(ilog2).or_default().push(peer); acc }); @@ -92,9 +83,11 @@ impl NetworkDiscovery { ); } - /// Returns one random candidate per bucket + /// Returns one random candidate per bucket. Also tries to refresh the candidate list. /// Todo: Limit the candidates to return. Favor the closest buckets. - pub(crate) fn candidates(&self) -> Vec<&NetworkAddress> { + pub(crate) fn candidates(&mut self) -> Vec<&NetworkAddress> { + self.try_refresh_candidates(); + let mut rng = thread_rng(); let mut op = Vec::with_capacity(self.candidates.len()); @@ -107,6 +100,14 @@ impl NetworkDiscovery { op } + /// Tries to refresh our current candidate list. We replace the old ones with new if we find any. + fn try_refresh_candidates(&mut self) { + let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS); + for (ilog2, candidates) in candidates_vec { + self.insert_candidates(ilog2, candidates); + } + } + // Insert the new candidates and remove the old ones to maintain MAX_PEERS_PER_BUCKET. fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec) { match self.candidates.entry(ilog2) { @@ -136,7 +137,7 @@ impl NetworkDiscovery { fn generate_candidates( self_key: &KBucketKey, num_to_generate: usize, - ) -> HashMap> { + ) -> BTreeMap> { (0..num_to_generate) .into_par_iter() .filter_map(|_| { @@ -148,15 +149,15 @@ impl NetworkDiscovery { // Since it is parallel iterator, the fold fn batches the items and will produce multiple outputs. So we // should use reduce fn to combine multiple outputs. .fold( - HashMap::new, - |mut acc: HashMap>, (ilog2, candidate)| { + BTreeMap::new, + |mut acc: BTreeMap>, (ilog2, candidate)| { acc.entry(ilog2).or_default().push(candidate); acc }, ) .reduce( - HashMap::new, - |mut acc: HashMap>, map| { + BTreeMap::new, + |mut acc: BTreeMap>, map| { for (ilog2, candidates) in map { let entry = acc.entry(ilog2).or_default(); for candidate in candidates { diff --git a/sn_node/src/lib.rs b/sn_node/src/lib.rs index 696a182e6d..a1428d83c1 100644 --- a/sn_node/src/lib.rs +++ b/sn_node/src/lib.rs @@ -66,7 +66,7 @@ use libp2p::PeerId; use sn_networking::{Network, SwarmLocalState}; use sn_protocol::NetworkAddress; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashSet}, path::PathBuf, }; use tokio::sync::broadcast; @@ -122,8 +122,9 @@ impl RunningNode { Ok(addresses) } - /// Returns the map of ilog2 distance of the Kbucket to the peers in that bucket - pub async fn get_kbuckets(&self) -> Result>> { + /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that + /// bucket. + pub async fn get_kbuckets(&self) -> Result>> { let kbuckets = self.network.get_kbuckets().await?; Ok(kbuckets) } diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index bb5f83d33d..998c0a218a 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -18,7 +18,7 @@ use libp2p::{ use sn_logging::LogBuilder; use sn_protocol::safenode_proto::{safe_node_client::SafeNodeClient, KBucketsRequest}; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, }; @@ -44,7 +44,7 @@ async fn verify_routing_table() -> Result<()> { tokio::time::sleep(sleep_duration).await; let all_peers = get_all_peer_ids().await?; - let mut all_failed_list = HashMap::new(); + let mut all_failed_list = BTreeMap::new(); for node_index in 1..NODE_COUNT + 1 { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); @@ -68,7 +68,7 @@ async fn verify_routing_table() -> Result<()> { .collect::>(); (ilog2, peers) }) - .collect::>(); + .collect::>(); let current_peer = all_peers[node_index as usize - 1]; let current_peer_key = KBucketKey::from(current_peer); From ade2f3850d0387597b31e7d4fdd0173775aeb9ac Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 27 Nov 2023 18:52:44 +0530 Subject: [PATCH 12/12] chore: increase routing table sleep before verification time --- .github/workflows/memcheck.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/memcheck.yml b/.github/workflows/memcheck.yml index 255b5be582..a7a8026273 100644 --- a/.github/workflows/memcheck.yml +++ b/.github/workflows/memcheck.yml @@ -115,7 +115,7 @@ jobs: - name: Verify the routing tables of the nodes run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture env: - SLEEP_BEFORE_VERIFICATION: 220 + SLEEP_BEFORE_VERIFICATION: 300 timeout-minutes: 10 - name: Verify restart of nodes using rg