diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 0201760d45..41e91a9e37 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -133,6 +133,8 @@ pub enum LocalSwarmCmd { /// Triggers interval repliation /// NOTE: This does result in outgoing messages, but is produced locally TriggerIntervalReplication, + /// Triggers unrelevant record cleanup + TriggerUnrelevantRecordCleanup, } /// Commands to send to the Swarm @@ -281,6 +283,9 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::TriggerIntervalReplication => { write!(f, "LocalSwarmCmd::TriggerIntervalReplication") } + LocalSwarmCmd::TriggerUnrelevantRecordCleanup => { + write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup") + } } } } @@ -817,6 +822,14 @@ impl SwarmDriver { self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch)); } } + LocalSwarmCmd::TriggerUnrelevantRecordCleanup => { + cmd_string = "TriggerUnrelevantRecordCleanup"; + self.swarm + .behaviour_mut() + .kademlia + .store_mut() + .cleanup_unrelevant_records(); + } } self.log_handling(cmd_string.to_string(), start.elapsed()); diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 6492cec4b1..66bc0af974 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -801,6 +801,10 @@ impl Network { self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes }); } + pub fn trigger_unrelevant_record_cleanup(&self) { + self.send_local_swarm_cmd(LocalSwarmCmd::TriggerUnrelevantRecordCleanup) + } + /// Helper to send NetworkSwarmCmd fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) { send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd); diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 412d8da649..2be44a2adc 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -448,6 +448,48 @@ impl NodeRecordStore { Ok(()) } + + // When the accumulated record copies exceeds the `expotional pricing point` (max_records * 0.6) + // those `out of range` records shall be cleaned up. + // This is to avoid `over-quoting` during restart, when RT is not fully populated, + // result in mis-calculation of relevant records. + pub fn cleanup_unrelevant_records(&mut self) { + let accumulated_records = self.records.len(); + if accumulated_records < 6 * MAX_RECORDS_COUNT / 10 { + return; + } + + let responsible_range = if let Some(range) = self.responsible_distance_range { + range + } else { + return; + }; + + let mut removed_keys = Vec::new(); + self.records.retain(|key, _val| { + let kbucket_key = KBucketKey::new(key.to_vec()); + let is_in_range = + responsible_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0); + if !is_in_range { + removed_keys.push(key.clone()); + } + is_in_range + }); + + // Each `remove` function call will try to re-calculate furthest + // when the key to be removed is the current furthest. + // To avoid duplicated calculation, hence reset `furthest` first here. + self.farthest_record = self.calculate_farthest(); + + for key in removed_keys.iter() { + // Deletion from disk will be undertaken as a spawned task, + // hence safe to call this function repeatedly here. + self.remove(key); + } + + info!("Cleaned up {} unrelevant records, among the original {accumulated_records} accumulated_records", + removed_keys.len()); + } } impl NodeRecordStore { diff --git a/sn_networking/src/record_store_api.rs b/sn_networking/src/record_store_api.rs index 570720f9ff..3ab93c9d42 100644 --- a/sn_networking/src/record_store_api.rs +++ b/sn_networking/src/record_store_api.rs @@ -169,4 +169,13 @@ impl UnifiedRecordStore { Self::Node(store) => store.mark_as_stored(k, record_type), }; } + + pub(crate) fn cleanup_unrelevant_records(&mut self) { + match self { + Self::Client(_store) => { + warn!("Calling cleanup_unrelevant_records at Client. This should not happen"); + } + Self::Node(store) => store.cleanup_unrelevant_records(), + } + } } diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index c99e476d69..0d8b60f314 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -79,6 +79,9 @@ const FORWARDED_BALANCE_FILE_NAME: &str = "forwarded_balance"; /// Interval to update the nodes uptime metric const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); +/// Interval to clean up unrelevant records +const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); + /// Helper to build and run a Node pub struct NodeBuilder { keypair: Keypair, @@ -323,6 +326,10 @@ impl Node { tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL); let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately + let mut unrelevant_records_cleanup_interval = + tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); + let _ = unrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -410,6 +417,13 @@ impl Node { let _ = node_metrics.uptime.set(node_metrics.started_instant.elapsed().as_secs() as i64); } } + _ = unrelevant_records_cleanup_interval.tick() => { + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::trigger_unrelevant_record_cleanup(network); + }); + } } } }); diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 070a858228..65c4accf7d 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -25,6 +25,11 @@ impl Node { network.trigger_interval_replication() } + /// Cleanup unrelevant records if accumulated too many. + pub(crate) fn trigger_unrelevant_record_cleanup(network: Network) { + network.trigger_unrelevant_record_cleanup() + } + /// Get the Record from a peer or from the network without waiting. pub(crate) fn fetch_replication_keys_without_wait( &self,