Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): impl routing table test #1005

Merged
merged 12 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ jobs:
run: cargo build --release --bins
timeout-minutes: 30

- name: Build churn tests
run: cargo test --release -p sn_node --test data_with_churn --no-run
- name: Build tests
run: cargo test --release -p sn_node --test data_with_churn --test verify_routing_table --no-run
timeout-minutes: 30

- name: Start a node instance that does not undergo churn
Expand Down Expand Up @@ -112,6 +112,12 @@ jobs:
SN_LOG: "all"
timeout-minutes: 30

- name: Verify the routing tables of the nodes
run: cargo test --release -p sn_node --test verify_routing_table -- --nocapture
env:
SLEEP_BEFORE_VERIFICATION: 220
timeout-minutes: 10

- name: Verify restart of nodes using rg
shell: bash
timeout-minutes: 1
Expand Down
16 changes: 12 additions & 4 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ jobs:
log_file_prefix: safe_test_logs_churn
platform: ${{ matrix.os }}

verify_data_location:
verify_data_location_routing_table:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Verify data location
name: Verify data location and Routing Table
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -464,8 +464,8 @@ jobs:
run: cargo build --release --features local-discovery --bin safenode --bin faucet
timeout-minutes: 30

- name: Build data location test
run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run
- name: Build data location and routing table tests
run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run
timeout-minutes: 30

- name: Start a local network
Expand All @@ -487,13 +487,21 @@ jobs:
echo "SAFE_PEERS has been set to $SAFE_PEERS"
fi

- name: Verify the routing tables of the nodes
run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture
timeout-minutes: 5

- name: Verify the location of the data on the network (4 * 5 mins)
run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture
env:
CHURN_COUNT: 3
SN_LOG: "all"
timeout-minutes: 30

- name: Verify the routing tables of the nodes
run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture
timeout-minutes: 5

- name: Verify restart of nodes using rg
shell: bash
timeout-minutes: 1
Expand Down
16 changes: 12 additions & 4 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ jobs:
SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}"
SLACK_TITLE: "Nightly Churn Test Run Failed"

verify_data_location:
name: Verify data location
verify_data_location_routing_table:
name: Verify data location and Routing Table
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -410,8 +410,8 @@ jobs:
run: cargo build --release --features local-discovery --bin safenode --bin faucet
timeout-minutes: 30

- name: Build data location test
run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --no-run
- name: Build data location and routing table tests
run: cargo test --release -p sn_node --features=local-discovery --test verify_data_location --test verify_routing_table --no-run
timeout-minutes: 30

- name: Start a local network
Expand All @@ -423,12 +423,20 @@ jobs:
faucet-path: target/release/faucet
platform: ${{ matrix.os }}

- name: Verify the Routing table of the nodes
run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture
timeout-minutes: 5

- name: Verify the location of the data on the network (approx 12 * 5 mins)
run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture
env:
CHURN_COUNT: 12
SN_LOG: "all"
timeout-minutes: 90

- name: Verify the routing tables of the nodes
run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture
timeout-minutes: 5

- name: Verify restart of nodes using rg
shell: bash
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ custom_debug = "~0.5.0"
libp2p = { version="0.53" , features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] }
prometheus-client = { version = "0.22", optional = true }
rand = { version = "~0.8.5", features = ["small_rng"] }
rayon = "1.8.0"
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = [ "derive", "rc" ]}
sn_protocol = { path = "../sn_protocol", version = "0.8.29" }
Expand Down
14 changes: 11 additions & 3 deletions sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::SwarmDriver;
use crate::{driver::PendingGetClosestType, SwarmDriver};
use std::time::{Duration, Instant};
use tokio::time::Interval;

Expand Down Expand Up @@ -49,16 +49,24 @@ impl SwarmDriver {
}

pub(crate) fn trigger_network_discovery(&mut self) {
let now = Instant::now();
// The query is just to trigger the network discovery,
// hence no need to wait for a result.
for addr in &self.network_discovery_candidates {
let _ = self
for addr in self.network_discovery.candidates() {
let query_id = self
maqi marked this conversation as resolved.
Show resolved Hide resolved
.swarm
.behaviour_mut()
.kademlia
.get_closest_peers(addr.as_bytes());
let _ = self.pending_get_closest_peers.insert(
query_id,
(PendingGetClosestType::NetworkDiscovery, Default::default()),
);
}
// Refresh the candidate list to not query the same candidates over and over again.
self.network_discovery.try_refresh_candidates();
maqi marked this conversation as resolved.
Show resolved Hide resolved
self.bootstrap.initiated();
debug!("Trigger network discovery took {:?}", now.elapsed());
}
}

Expand Down
39 changes: 32 additions & 7 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::SwarmDriver,
driver::{PendingGetClosestType, SwarmDriver},
error::{Error, Result},
sort_peers_by_address, GetQuorum, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATE_RANGE,
Expand Down Expand Up @@ -53,6 +53,10 @@ pub enum SwarmCmd {
GetAllLocalPeers {
sender: oneshot::Sender<Vec<PeerId>>,
},
// Get the map of ilog2 distance of the Kbucket to the peers in that bucket
GetKBuckets {
sender: oneshot::Sender<HashMap<u32, Vec<PeerId>>>,
},
// Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
// And our PeerId as well.
GetClosestKLocalPeers {
Expand Down Expand Up @@ -240,9 +244,9 @@ impl Debug for SwarmCmd {
SwarmCmd::GetAllLocalPeers { .. } => {
write!(f, "SwarmCmd::GetAllLocalPeers")
}
// SwarmCmd::GetOurCloseGroup { .. } => {
// write!(f, "SwarmCmd::GetOurCloseGroup")
// }
SwarmCmd::GetKBuckets { .. } => {
write!(f, "SwarmCmd::GetKBuckets")
}
SwarmCmd::GetSwarmLocalState { .. } => {
write!(f, "SwarmCmd::GetSwarmLocalState")
}
Expand Down Expand Up @@ -508,13 +512,34 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.get_closest_peers(key.as_bytes());
let _ = self
.pending_get_closest_peers
.insert(query_id, (sender, Default::default()));
let _ = self.pending_get_closest_peers.insert(
query_id,
(
PendingGetClosestType::FunctionCall(sender),
Default::default(),
),
);
}
SwarmCmd::GetAllLocalPeers { sender } => {
let _ = sender.send(self.get_all_local_peers());
}
SwarmCmd::GetKBuckets { sender } => {
let mut ilog2_kbuckets = HashMap::new();
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let range = kbucket.range();
if let Some(distance) = range.0.ilog2() {
let peers_in_kbucket = kbucket
.iter()
.map(|peer_entry| peer_entry.node.key.clone().into_preimage())
.collect::<Vec<PeerId>>();
let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
} else {
// This shall never happen.
error!("bucket is ourself ???!!!");
}
}
let _ = sender.send(ilog2_kbuckets);
}
SwarmCmd::GetCloseGroupLocalPeers { key, sender } => {
let key = key.as_kbucket_key();
// calls `kbuckets.closest_keys(key)` internally, which orders the peers by
Expand Down
63 changes: 13 additions & 50 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
event::NetworkEvent,
event::{GetRecordResultMap, NodeEvent},
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig},
record_store_api::UnifiedRecordStore,
replication_fetcher::ReplicationFetcher,
Expand Down Expand Up @@ -50,7 +51,7 @@ use sn_protocol::{
NetworkAddress, PrettyPrintKBucketKey,
};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
Expand All @@ -63,7 +64,15 @@ use tracing::warn;
/// List of expected record holders to be verified.
pub(super) type ExpectedHoldersList = HashSet<PeerId>;

type PendingGetClosest = HashMap<QueryId, (oneshot::Sender<HashSet<PeerId>>, HashSet<PeerId>)>;
/// The ways in which the Get Closest queries are used.
pub(crate) enum PendingGetClosestType {
/// The network discovery method is present at the networking layer
/// Thus we can just process the queries made by NetworkDiscovery without using any channels
NetworkDiscovery,
/// These are queries made by a function at the upper layers and contains a channel to send the result back.
FunctionCall(oneshot::Sender<HashSet<PeerId>>),
}
type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, HashSet<PeerId>)>;
type PendingGetRecord = HashMap<
QueryId,
(
Expand Down Expand Up @@ -495,7 +504,7 @@ impl NetworkBuilder {
// `identify` protocol to kick in and get them in the routing table.
dialed_peers: CircularVec::new(63),
is_gossip_handler: false,
network_discovery_candidates: generate_kbucket_specific_candidates(&peer_id),
network_discovery: NetworkDiscovery::new(&peer_id),
};

Ok((
Expand All @@ -511,51 +520,6 @@ impl NetworkBuilder {
}
}

fn generate_kbucket_specific_candidates(self_peer_id: &PeerId) -> Vec<NetworkAddress> {
let mut candidates: BTreeMap<usize, NetworkAddress> = BTreeMap::new();
// To avoid deadlock or taking too much time, currently set a fixed generation attempts
let mut attempts = 0;
// Also an early return when got the first 20 furthest kBuckets covered.
let mut buckets_covered: BTreeSet<_> = (0..21).map(|index| index as usize).collect();

let local_key = NetworkAddress::from_peer(*self_peer_id).as_kbucket_key();
let local_key_bytes_len = local_key.hashed_bytes().len();
while attempts < 10000 && !buckets_covered.is_empty() {
let candiate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candiate.as_kbucket_key();
let candidate_key_bytes_len = candidate_key.hashed_bytes().len();

if local_key_bytes_len != candidate_key_bytes_len {
panic!("kBucketKey has different key length, {candiate:?} has {candidate_key_bytes_len:?}, {self_peer_id:?} has {local_key_bytes_len:?}");
}

let common_leading_bits =
common_leading_bits(local_key.hashed_bytes(), candidate_key.hashed_bytes());

let _ = candidates.insert(common_leading_bits, candiate);
let _ = buckets_covered.remove(&common_leading_bits);

attempts += 1;
}

let generated_buckets: Vec<_> = candidates.keys().copied().collect();
let generated_candidates: Vec<_> = candidates.values().cloned().collect();
trace!("Generated targets covering kbuckets {generated_buckets:?}");
generated_candidates
}

/// Returns the length of the common leading bits.
/// e.g. when `11110000` and `11111111`, return as 4.
/// Note: the length of two shall be the same
fn common_leading_bits(one: &[u8], two: &[u8]) -> usize {
for byte_index in 0..one.len() {
if one[byte_index] != two[byte_index] {
return (byte_index * 8) + (one[byte_index] ^ two[byte_index]).leading_zeros() as usize;
}
}
8 * one.len()
}

pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
Expand Down Expand Up @@ -584,9 +548,8 @@ pub struct SwarmDriver {
// they are not supposed to process the gossip msg that received from libp2p.
pub(crate) is_gossip_handler: bool,
// A list of random `PeerId` candidates that falls into kbuckets,
// one for each furthest 30 kbuckets.
// This is to ensure a more accurate network discovery.
pub(crate) network_discovery_candidates: Vec<NetworkAddress>,
pub(crate) network_discovery: NetworkDiscovery,
}

impl SwarmDriver {
Expand Down
Loading
Loading