diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 23c3e2bb31..ef291cf271 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -1395,6 +1395,10 @@ jobs: exit 1 fi + # Sleep for a while to allow network density sampling + - name: Sleep a while + run: sleep 300 + - name: Stop the local network and upload logs if: always() uses: maidsafe/sn-local-testnet-action@main diff --git a/Cargo.lock b/Cargo.lock index 0ff28dc1c5..bef5c19f81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8667,6 +8667,7 @@ name = "sn_networking" version = "0.19.4" dependencies = [ "aes-gcm-siv", + "alloy", "assert_fs", "async-trait", "blsttc", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 15af991d0c..82b4f73585 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -37,6 +37,7 @@ libp2p = { version = "0.54.1", features = [ "yamux", "websocket", ] } +alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] } async-trait = "0.1" bytes = { version = "1.0.1", features = ["serde"] } exponential-backoff = "2.0.0" diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 48372d8d17..1f36a81988 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -17,7 +17,7 @@ use crate::{ use libp2p::{ kad::{ store::{Error as StoreError, RecordStore}, - Quorum, Record, RecordKey, + KBucketDistance as Distance, Quorum, Record, RecordKey, }, Multiaddr, PeerId, }; @@ -136,6 +136,10 @@ pub enum LocalSwarmCmd { TriggerIntervalReplication, /// Triggers unrelevant record cleanup TriggerIrrelevantRecordCleanup, + /// Add a network density sample + AddNetworkDensitySample { + distance: Distance, + }, } /// Commands to send to the Swarm @@ -287,6 +291,9 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::TriggerIrrelevantRecordCleanup => { write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup") } + LocalSwarmCmd::AddNetworkDensitySample { distance } => { + write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})") + } } } } @@ -868,6 +875,10 @@ impl SwarmDriver { .store_mut() .cleanup_irrelevant_records(); } + LocalSwarmCmd::AddNetworkDensitySample { distance } => { + cmd_string = "AddNetworkDensitySample"; + self.network_density_samples.add(distance); + } } self.log_handling(cmd_string.to_string(), start.elapsed()); diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 43a5525ccf..d43edc4852 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -13,6 +13,7 @@ use crate::{ error::{NetworkError, Result}, event::{NetworkEvent, NodeEvent}, external_address::ExternalAddressManager, + fifo_register::FifoRegister, log_markers::Marker, multiaddr_pop_p2p, network_discovery::NetworkDiscovery, @@ -28,6 +29,7 @@ use crate::{ metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries, }; use crate::{transport, NodeIssue}; +use alloy::primitives::U256; use futures::future::Either; use futures::StreamExt; #[cfg(feature = "local")] @@ -730,6 +732,7 @@ impl NetworkBuilder { replication_targets: Default::default(), last_replication: None, last_connection_pruning_time: Instant::now(), + network_density_samples: FifoRegister::new(100), }; let network = Network::new( @@ -835,6 +838,8 @@ pub struct SwarmDriver { pub(crate) last_replication: Option, /// when was the last outdated connection prunning undertaken. pub(crate) last_connection_pruning_time: Instant, + /// FIFO cache for the network density samples + pub(crate) network_density_samples: FifoRegister, } impl SwarmDriver { @@ -919,7 +924,29 @@ impl SwarmDriver { let closest_k_peers = self.get_closest_k_value_local_peers(); if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) { - info!("Set responsible range to {distance}"); + let ( + _index, + _total_peers, + peers_in_non_full_buckets, + num_of_full_buckets, + _kbucket_table_stats, + ) = self.kbuckets_status(); + let estimated_network_size = + Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets); + // The entire Distance space is U256 + // The density can be estimated as: distance/space + let density = U256::MAX / U256::from(estimated_network_size); + let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE); + + let sampled_distance = self.network_density_samples.get_median(); + let ilog2 = if let Some(distance) = sampled_distance { + distance.ilog2() + } else { + None + }; + info!("Set responsible range to {distance}, current sampled_distance is {ilog2:?}({sampled_distance:?}), \ + estimated_distance is {:?}({estimated_distance:?}) with network_size of {estimated_network_size}", + estimated_distance.log2()); // set any new distance to farthest record in the store self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance); // the distance range within the replication_fetcher shall be in sync as well diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index e1d8074d29..cc1e197109 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -36,6 +36,9 @@ use std::{ }; use tokio::sync::oneshot; +// (total_buckets, total_peers, peers_in_non_full_buckets, num_of_full_buckets, kbucket_table_stats) +type KBucketStatus = (usize, usize, usize, usize, Vec<(usize, usize, u32)>); + /// NodeEvent enum #[derive(CustomDebug)] pub(super) enum NodeEvent { @@ -295,12 +298,8 @@ impl SwarmDriver { } } - /// Logs the kbuckets also records the bucket info. - pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) { - let distance = NetworkAddress::from_peer(self.self_peer_id) - .distance(&NetworkAddress::from_peer(*peer)); - info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2()); - + /// Collect kbuckets status + pub(crate) fn kbuckets_status(&mut self) -> KBucketStatus { let mut kbucket_table_stats = vec![]; let mut index = 0; let mut total_peers = 0; @@ -327,6 +326,28 @@ impl SwarmDriver { } index += 1; } + ( + index, + total_peers, + peers_in_non_full_buckets, + num_of_full_buckets, + kbucket_table_stats, + ) + } + + /// Logs the kbuckets also records the bucket info. + pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) { + let distance = NetworkAddress::from_peer(self.self_peer_id) + .distance(&NetworkAddress::from_peer(*peer)); + info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2()); + + let ( + index, + total_peers, + peers_in_non_full_buckets, + num_of_full_buckets, + kbucket_table_stats, + ) = self.kbuckets_status(); let estimated_network_size = Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets); @@ -353,7 +374,7 @@ impl SwarmDriver { } /// Estimate the number of nodes in the network - fn estimate_network_size( + pub(crate) fn estimate_network_size( peers_in_non_full_buckets: usize, num_of_full_buckets: usize, ) -> usize { diff --git a/sn_networking/src/fifo_register.rs b/sn_networking/src/fifo_register.rs new file mode 100644 index 0000000000..c8ab96ba8c --- /dev/null +++ b/sn_networking/src/fifo_register.rs @@ -0,0 +1,62 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use libp2p::kad::KBucketDistance as Distance; +use std::collections::VecDeque; + +pub(crate) struct FifoRegister { + queue: VecDeque, + max_length: usize, + cached_median: Option, // Cache for the median result + is_dirty: bool, // Flag indicating if cache is valid +} + +impl FifoRegister { + // Creates a new FifoRegister with a specified maximum length + pub(crate) fn new(max_length: usize) -> Self { + FifoRegister { + queue: VecDeque::with_capacity(max_length), + max_length, + cached_median: None, + is_dirty: true, + } + } + + // Adds an entry to the register, removing excess elements if over max_length + pub(crate) fn add(&mut self, entry: Distance) { + if self.queue.len() == self.max_length { + self.queue.pop_front(); // Remove the oldest element to maintain length + } + self.queue.push_back(entry); + + // Mark the cache as invalid since the data has changed + self.is_dirty = true; + } + + // Returns the median of the maximum values of the entries + pub(crate) fn get_median(&mut self) -> Option { + if self.queue.is_empty() { + return None; // No median if the queue is empty + } + + if !self.is_dirty { + return self.cached_median; // Return cached result if it's valid + } + + let mut max_values: Vec = self.queue.iter().copied().collect(); + + max_values.sort_unstable(); + + let len = max_values.len(); + // Cache the result and mark the cache as valid + self.cached_median = Some(max_values[len / 2]); + self.is_dirty = false; + + self.cached_median + } +} diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..55f7afb959 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -16,6 +16,7 @@ mod driver; mod error; mod event; mod external_address; +mod fifo_register; mod log_markers; #[cfg(feature = "open-metrics")] mod metrics; @@ -1007,6 +1008,10 @@ impl Network { self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup) } + pub fn add_network_density_sample(&self, distance: KBucketDistance) { + self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance }) + } + /// Helper to send NetworkSwarmCmd fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) { send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd); diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..e81496876b 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -64,6 +64,9 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); /// Interval to clean up unrelevant records const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); +/// Interval to carryout network density sampling +const NETWORK_DENSITY_SAMPLING_INTERVAL: Duration = Duration::from_secs(113); + /// Helper to build and run a Node pub struct NodeBuilder { identity_keypair: Keypair, @@ -277,6 +280,10 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + let mut network_density_sampling_interval = + tokio::time::interval(NETWORK_DENSITY_SAMPLING_INTERVAL); + let _ = network_density_sampling_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -341,6 +348,16 @@ impl Node { Self::trigger_irrelevant_record_cleanup(network); }); } + _ = network_density_sampling_interval.tick() => { + let start = Instant::now(); + debug!("Periodic network density sampling triggered"); + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::network_density_sampling(network).await; + trace!("Periodic network density sampling took {:?}", start.elapsed()); + }); + } } } }); @@ -712,6 +729,22 @@ impl Node { Response::Query(resp) } + async fn network_density_sampling(network: Network) { + for _ in 0..10 { + let target = NetworkAddress::from_peer(PeerId::random()); + // Result is sorted and only return CLOSE_GROUP_SIZE entries + let peers = network.node_get_closest_peers(&target).await; + if let Ok(peers) = peers { + if peers.len() >= CLOSE_GROUP_SIZE { + // Calculate the distance to the farthest. + let distance = + target.distance(&NetworkAddress::from_peer(peers[CLOSE_GROUP_SIZE - 1])); + network.add_network_density_sample(distance); + } + } + } + } + async fn try_bad_nodes_check(network: Network, rolling_index: usize) { if let Ok(kbuckets) = network.get_kbuckets().await { let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();