Skip to content

Commit

Permalink
Merge pull request #2464 from maqi/use_sampling_for_range_calculation
Browse files Browse the repository at this point in the history
Node side RBS support
  • Loading branch information
jacderida authored Nov 29, 2024
2 parents 202b093 + b1b1d2d commit c6c8d43
Show file tree
Hide file tree
Showing 25 changed files with 466 additions and 466 deletions.
355 changes: 80 additions & 275 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ curv = { version = "0.10.1", package = "sn_curv", default-features = false, feat
eip2333 = { version = "0.2.1", package = "sn_bls_ckd" }
const-hex = "1.12.0"
hex = "~0.4.3"
libp2p = "0.54.1"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2" }
rand = "0.8.5"
rmp-serde = "1.1.1"
self_encryption = "~0.30.0"
Expand Down
2 changes: 1 addition & 1 deletion nat-detection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ clap = { version = "4.5.4", features = ["derive"] }
clap-verbosity-flag = "2.2.0"
color-eyre = { version = "0.6", default-features = false }
futures = "~0.3.13"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"tcp",
"noise",
Expand Down
2 changes: 1 addition & 1 deletion sn_evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ custom_debug = "~0.6.1"
evmlib = { path = "../evmlib", version = "0.1.4" }
hex = "~0.4.3"
lazy_static = "~1.4.0"
libp2p = { version = "0.53", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = ["derive", "rc"] }
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ loud = []

[dependencies]
lazy_static = "~1.4.0"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down Expand Up @@ -98,7 +98,7 @@ crate-type = ["cdylib", "rlib"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down
103 changes: 82 additions & 21 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use crate::{
event::TerminateNodeReason,
log_markers::Marker,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATION_PEERS_COUNT,
};
use libp2p::{
kad::{
store::{Error as StoreError, RecordStore},
Quorum, Record, RecordKey,
KBucketDistance as Distance, Quorum, Record, RecordKey,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -64,6 +63,12 @@ pub enum LocalSwarmCmd {
GetKBuckets {
sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
},
/// Returns the replicate candidates in range.
/// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers.
GetReplicateCandidates {
data_addr: NetworkAddress,
sender: oneshot::Sender<Vec<PeerId>>,
},
// Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
// And our PeerId as well.
GetClosestKLocalPeers {
Expand Down Expand Up @@ -136,6 +141,10 @@ pub enum LocalSwarmCmd {
TriggerIntervalReplication,
/// Triggers unrelevant record cleanup
TriggerIrrelevantRecordCleanup,
/// Add a network density sample
AddNetworkDensitySample {
distance: Distance,
},
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -216,7 +225,9 @@ impl Debug for LocalSwarmCmd {
PrettyPrintRecordKey::from(key)
)
}

LocalSwarmCmd::GetReplicateCandidates { .. } => {
write!(f, "LocalSwarmCmd::GetReplicateCandidates")
}
LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
}
Expand Down Expand Up @@ -287,6 +298,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
}
}
}
}
Expand Down Expand Up @@ -702,7 +716,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance_bucket()
.get_farthest_replication_distance()
{
self.replication_fetcher
.set_replication_distance_range(distance);
Expand Down Expand Up @@ -802,7 +816,10 @@ impl SwarmDriver {
cmd_string = "GetClosestKLocalPeers";
let _ = sender.send(self.get_closest_k_value_local_peers());
}

LocalSwarmCmd::GetReplicateCandidates { data_addr, sender } => {
cmd_string = "GetReplicateCandidates";
let _ = sender.send(self.get_replicate_candidates(&data_addr));
}
LocalSwarmCmd::GetSwarmLocalState(sender) => {
cmd_string = "GetSwarmLocalState";
let current_state = SwarmLocalState {
Expand Down Expand Up @@ -868,6 +885,10 @@ impl SwarmDriver {
.store_mut()
.cleanup_irrelevant_records();
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
cmd_string = "AddNetworkDensitySample";
self.network_density_samples.add(distance);
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down Expand Up @@ -995,22 +1016,8 @@ impl SwarmDriver {
// Store the current time as the last replication time
self.last_replication = Some(Instant::now());

// get closest peers from buckets, sorted by increasing distance to us
let our_peer_id = self.self_peer_id.into();
let closest_k_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&our_peer_id)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage());

// Only grab the closest nodes within the REPLICATE_RANGE
let mut replicate_targets = closest_k_peers
.into_iter()
// add some leeway to allow for divergent knowledge
.take(REPLICATION_PEERS_COUNT)
.collect::<Vec<_>>();
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let mut replicate_targets = self.get_replicate_candidates(&self_addr);

let now = Instant::now();
self.replication_targets
Expand Down Expand Up @@ -1055,4 +1062,58 @@ impl SwarmDriver {

Ok(())
}

// Replies with in-range replicate candidates
// Fall back to CLOSE_GROUP_SIZE peers if range is too narrow.
// Note that:
// * For general replication, replicate candidates shall be the closest to self
// * For replicate fresh records, the replicate candidates shall be the closest to data
pub(crate) fn get_replicate_candidates(&mut self, target: &NetworkAddress) -> Vec<PeerId> {
// get closest peers from buckets, sorted by increasing distance to the target
let kbucket_key = target.as_kbucket_key();
let closest_k_peers: Vec<PeerId> = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&kbucket_key)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage())
.collect();

if let Some(responsible_range) = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance()
{
let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);

if peers_in_range.len() >= CLOSE_GROUP_SIZE {
return peers_in_range;
}
}

// In case the range is too narrow, fall back to at least CLOSE_GROUP_SIZE peers.
closest_k_peers
.iter()
.take(CLOSE_GROUP_SIZE)
.cloned()
.collect()
}
}

/// Returns the nodes that within the defined distance.
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
peers
.iter()
.filter_map(|peer_id| {
let distance = address.distance(&NetworkAddress::from_peer(*peer_id));
if distance <= range {
Some(*peer_id)
} else {
None
}
})
.collect()
}
95 changes: 59 additions & 36 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
error::{NetworkError, Result},
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
fifo_register::FifoRegister,
log_markers::Marker,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
Expand All @@ -35,7 +36,7 @@ use libp2p::mdns;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
kad::{self, KBucketDistance as Distance, QueryId, Quorum, Record, RecordKey, K_VALUE, U256},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
Expand Down Expand Up @@ -736,6 +737,7 @@ impl NetworkBuilder {
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
network_density_samples: FifoRegister::new(100),
};

let network = Network::new(
Expand Down Expand Up @@ -841,6 +843,8 @@ pub struct SwarmDriver {
pub(crate) last_replication: Option<Instant>,
/// when was the last outdated connection prunning undertaken.
pub(crate) last_connection_pruning_time: Instant,
/// FIFO cache for the network density samples
pub(crate) network_density_samples: FifoRegister,
}

impl SwarmDriver {
Expand Down Expand Up @@ -922,15 +926,62 @@ impl SwarmDriver {
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
if estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {estimated_network_size}, with {peers_in_non_full_buckets} peers_in_non_full_buckets and {num_of_full_buckets}num_of_full_buckets.");
continue;
}
// The entire Distance space is U256
// (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE);
let density_distance = Distance(estimated_distance);

// Use distanct to close peer to avoid the situation that
// the estimated density_distance is too narrow.
let closest_k_peers = self.get_closest_k_value_local_peers();

if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) {
info!("Set responsible range to {distance}");
// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
self.replication_fetcher.set_replication_distance_range(distance);
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
continue;
}
// Results are sorted, hence can calculate distance directly
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE + 1]));

let distance = std::cmp::max(density_distance, close_peers_distance);

// let distance = if let Some(distance) = self.network_density_samples.get_median() {
// distance
// } else {
// // In case sampling not triggered or yet,
// // fall back to use the distance to CLOSE_GROUP_SIZEth closest
// let closest_k_peers = self.get_closest_k_value_local_peers();
// if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 1 {
// continue;
// }
// // Results are sorted, hence can calculate distance directly
// // Note: self is included
// let self_addr = NetworkAddress::from_peer(self.self_peer_id);
// self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))

// };

info!("Set responsible range to {distance:?}({:?})", distance.ilog2());

// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
self.replication_fetcher.set_replication_distance_range(distance);
}
}
_ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes),
Expand All @@ -942,34 +993,6 @@ impl SwarmDriver {
// ---------- Crate helpers -------------------
// --------------------------------------------

/// Uses the closest k peers to estimate the farthest address as
/// `K_VALUE / 2`th peer's bucket.
fn get_responsbile_range_estimate(
&mut self,
// Sorted list of closest k peers to our peer id.
closest_k_peers: &[PeerId],
) -> Option<u32> {
// if we don't have enough peers we don't set the distance range yet.
let mut farthest_distance = None;

if closest_k_peers.is_empty() {
return farthest_distance;
}

let our_address = NetworkAddress::from_peer(self.self_peer_id);

// get `K_VALUE / 2`th peer's address distance
// This is a rough estimate of the farthest address we might be responsible for.
// We want this to be higher than actually necessary, so we retain more data
// and can be sure to pass bad node checks
let target_index = std::cmp::min(K_VALUE.get() / 2, closest_k_peers.len()) - 1;

let address = NetworkAddress::from_peer(closest_k_peers[target_index]);
farthest_distance = our_address.distance(&address).ilog2();

farthest_distance
}

/// Pushes NetworkSwarmCmd off thread so as to be non-blocking
/// this is a wrapper around the `mpsc::Sender::send` call
pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
Expand Down
Loading

0 comments on commit c6c8d43

Please sign in to comment.