Skip to content

Commit

Permalink
feat: network density estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 28, 2024
1 parent e3eb3b2 commit 69b8b9c
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 320 deletions.
355 changes: 80 additions & 275 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ curv = { version = "0.10.1", package = "sn_curv", default-features = false, feat
eip2333 = { version = "0.2.1", package = "sn_bls_ckd" }
const-hex = "1.12.0"
hex = "~0.4.3"
libp2p = "0.54.1"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2" }
rand = "0.8.5"
rmp-serde = "1.1.1"
self_encryption = "~0.30.0"
Expand Down
2 changes: 1 addition & 1 deletion nat-detection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ clap = { version = "4.5.4", features = ["derive"] }
clap-verbosity-flag = "2.2.0"
color-eyre = { version = "0.6", default-features = false }
futures = "~0.3.13"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"tcp",
"noise",
Expand Down
2 changes: 1 addition & 1 deletion sn_evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ custom_debug = "~0.6.1"
evmlib = { path = "../evmlib", version = "0.1.4" }
hex = "~0.4.3"
lazy_static = "~1.4.0"
libp2p = { version = "0.53", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = ["derive", "rc"] }
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ loud = []

[dependencies]
lazy_static = "~1.4.0"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down Expand Up @@ -98,7 +98,7 @@ crate-type = ["cdylib", "rlib"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down
66 changes: 50 additions & 16 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use libp2p::mdns;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
kad::{self, KBucketDistance as Distance, QueryId, Quorum, Record, RecordKey, K_VALUE, U256},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
Expand Down Expand Up @@ -926,21 +926,55 @@ impl SwarmDriver {
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let distance = if let Some(distance) = self.network_density_samples.get_median() {
distance
} else {
// In case sampling not triggered or yet,
// fall back to use the distance to CLOSE_GROUP_SIZEth closest
let closest_k_peers = self.get_closest_k_value_local_peers();
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 1 {
continue;
}
// Results are sorted, hence can calculate distance directly
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))

};
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
if estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {estimated_network_size}, with {peers_in_non_full_buckets} peers_in_non_full_buckets and {num_of_full_buckets}num_of_full_buckets.");
continue;
}
// The entire Distance space is U256
// (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE);
let density_distance = Distance(estimated_distance);

// Use distanct to close peer to avoid the situation that
// the estimated density_distance is too narrow.
let closest_k_peers = self.get_closest_k_value_local_peers();
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
continue;
}
// Results are sorted, hence can calculate distance directly
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE + 1]));

let distance = std::cmp::max(density_distance, close_peers_distance);

// let distance = if let Some(distance) = self.network_density_samples.get_median() {
// distance
// } else {
// // In case sampling not triggered or yet,
// // fall back to use the distance to CLOSE_GROUP_SIZEth closest
// let closest_k_peers = self.get_closest_k_value_local_peers();
// if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 1 {
// continue;
// }
// // Results are sorted, hence can calculate distance directly
// // Note: self is included
// let self_addr = NetworkAddress::from_peer(self.self_peer_id);
// self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))

// };

info!("Set responsible range to {distance:?}({:?})", distance.ilog2());

Expand Down
35 changes: 28 additions & 7 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use std::{
};
use tokio::sync::oneshot;

// (total_buckets, total_peers, peers_in_non_full_buckets, num_of_full_buckets, kbucket_table_stats)
type KBucketStatus = (usize, usize, usize, usize, Vec<(usize, usize, u32)>);

/// NodeEvent enum
#[derive(CustomDebug)]
pub(super) enum NodeEvent {
Expand Down Expand Up @@ -281,12 +284,8 @@ impl SwarmDriver {
}
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());

/// Collect kbuckets status
pub(crate) fn kbuckets_status(&mut self) -> KBucketStatus {
let mut kbucket_table_stats = vec![];
let mut index = 0;
let mut total_peers = 0;
Expand All @@ -313,6 +312,28 @@ impl SwarmDriver {
}
index += 1;
}
(
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
)
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());

let (
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
) = self.kbuckets_status();

let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
Expand All @@ -339,7 +360,7 @@ impl SwarmDriver {
}

/// Estimate the number of nodes in the network
fn estimate_network_size(
pub(crate) fn estimate_network_size(
peers_in_non_full_buckets: usize,
num_of_full_buckets: usize,
) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion sn_networking/src/fifo_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use std::collections::VecDeque;
pub(crate) struct FifoRegister {
queue: VecDeque<Distance>,
max_length: usize,
#[allow(dead_code)]
cached_median: Option<Distance>, // Cache for the median result
is_dirty: bool, // Flag indicating if cache is valid
is_dirty: bool, // Flag indicating if cache is valid
}

impl FifoRegister {
Expand All @@ -39,6 +40,7 @@ impl FifoRegister {
}

// Returns the median of the maximum values of the entries
#[allow(dead_code)]
pub(crate) fn get_median(&mut self) -> Option<Distance> {
if self.queue.is_empty() {
return None; // No median if the queue is empty
Expand Down
2 changes: 1 addition & 1 deletion sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ file-rotate = "0.7.3"
futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["tokio", "dns", "kad", "macros"] }
num-traits = "0.2"
prometheus-client = { version = "0.22", optional = true }
# watch out updating this, protoc compiler needs to be installed on all build systems
Expand Down
21 changes: 13 additions & 8 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,18 @@ impl Node {
});
}
_ = network_density_sampling_interval.tick() => {
let start = Instant::now();
debug!("Periodic network density sampling triggered");
let network = self.network().clone();

let _handle = spawn(async move {
Self::network_density_sampling(network).await;
trace!("Periodic network density sampling took {:?}", start.elapsed());
});
// The following shall be used by client only to support RBS.
// Due to the concern of the extra resource usage that incurred.
continue;

// let start = Instant::now();
// debug!("Periodic network density sampling triggered");
// let network = self.network().clone();

// let _handle = spawn(async move {
// Self::network_density_sampling(network).await;
// trace!("Periodic network density sampling took {:?}", start.elapsed());
// });
}
}
}
Expand Down Expand Up @@ -850,6 +854,7 @@ impl Node {
);
}

#[allow(dead_code)]
async fn network_density_sampling(network: Network) {
for _ in 0..10 {
let target = NetworkAddress::from_peer(PeerId::random());
Expand Down
2 changes: 1 addition & 1 deletion sn_node_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ colored = "2.0.4"
color-eyre = "~0.6"
dirs-next = "2.0.0"
indicatif = { version = "0.17.5", features = ["tokio"] }
libp2p = { version = "0.54.1", features = [] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [] }
libp2p-identity = { version = "0.2.7", features = ["rand"] }
prost = { version = "0.9" }
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion sn_node_rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bls = { package = "blsttc", version = "8.0.1" }
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = "0.6.2"
hex = "~0.4.3"
libp2p = { version = "0.54.1", features = ["kad"]}
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["kad"]}
libp2p-identity = { version="0.2.7", features = ["rand"] }
sn_build_info = { path = "../sn_build_info", version = "0.1.19" }
sn_logging = { path = "../sn_logging", version = "0.2.40" }
Expand Down
2 changes: 1 addition & 1 deletion sn_peers_acquisition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ websockets = []
[dependencies]
clap = { version = "4.2.1", features = ["derive", "env"] }
lazy_static = "~1.4.0"
libp2p = { version = "0.54.1", features = [] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [] }
rand = "0.8.5"
reqwest = { version="0.12.2", default-features=false, features = ["rustls-tls"] }
sn_protocol = { path = "../sn_protocol", version = "0.17.15", optional = true}
Expand Down
2 changes: 1 addition & 1 deletion sn_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ custom_debug = "~0.6.1"
dirs-next = "~2.0.0"
hex = "~0.4.3"
lazy_static = "1.4.0"
libp2p = { version = "0.54.1", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = [ "derive", "rc" ]}
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion sn_service_management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "0.4.3"
[dependencies]
async-trait = "0.1"
dirs-next = "2.0.0"
libp2p = { version = "0.54.1", features = ["kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["kad"] }
libp2p-identity = { version = "0.2.7", features = ["rand"] }
prost = { version = "0.9" }
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion sn_transfers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ custom_debug = "~0.6.1"
dirs-next = "~2.0.0"
hex = "~0.4.3"
lazy_static = "~1.4.0"
libp2p = { version = "0.54.1", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
rmp-serde = "1.1.1"
secrecy = "0.8.0"
Expand Down
2 changes: 1 addition & 1 deletion test_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bytes = { version = "1.0.1", features = ["serde"] }
color-eyre = "~0.6.2"
dirs-next = "~2.0.0"
evmlib = { path = "../evmlib", version = "0.1.4" }
libp2p = { version = "0.54.1", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rand = "0.8.5"
serde = { version = "1.0.133", features = ["derive"] }
serde_json = "1.0"
Expand Down

0 comments on commit 69b8b9c

Please sign in to comment.