Skip to content

Commit

Permalink
feat(node): prune unrelevant records if accumulated too many
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Aug 15, 2024
1 parent 2b02e0e commit 474d466
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 0 deletions.
13 changes: 13 additions & 0 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub enum LocalSwarmCmd {
/// Triggers interval repliation
/// NOTE: This does result in outgoing messages, but is produced locally
TriggerIntervalReplication,
/// Triggers unrelevant record cleanup
TriggerUnrelevantRecordCleanup,
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -281,6 +283,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::TriggerIntervalReplication => {
write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
}
LocalSwarmCmd::TriggerUnrelevantRecordCleanup => {
write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
}
}
}
}
Expand Down Expand Up @@ -817,6 +822,14 @@ impl SwarmDriver {
self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
}
}
LocalSwarmCmd::TriggerUnrelevantRecordCleanup => {
cmd_string = "TriggerUnrelevantRecordCleanup";
self.swarm
.behaviour_mut()
.kademlia
.store_mut()
.cleanup_unrelevant_records();
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down
4 changes: 4 additions & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ impl Network {
self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
}

pub fn trigger_unrelevant_record_cleanup(&self) {
self.send_local_swarm_cmd(LocalSwarmCmd::TriggerUnrelevantRecordCleanup)
}

/// Helper to send NetworkSwarmCmd
fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
Expand Down
42 changes: 42 additions & 0 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,48 @@ impl NodeRecordStore {

Ok(())
}

// When the accumulated record copies exceeds the `expotional pricing point` (max_records * 0.6)
// those `out of range` records shall be cleaned up.
// This is to avoid `over-quoting` during restart, when RT is not fully populated,
// result in mis-calculation of relevant records.
pub fn cleanup_unrelevant_records(&mut self) {
let accumulated_records = self.records.len();
if accumulated_records < 6 * MAX_RECORDS_COUNT / 10 {
return;
}

let responsible_range = if let Some(range) = self.responsible_distance_range {
range
} else {
return;
};

let mut removed_keys = Vec::new();
self.records.retain(|key, _val| {
let kbucket_key = KBucketKey::new(key.to_vec());
let is_in_range =
responsible_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0);
if !is_in_range {
removed_keys.push(key.clone());
}
is_in_range
});

// Each `remove` function call will try to re-calculate furthest
// when the key to be removed is the current furthest.
// To avoid duplicated calculation, hence reset `furthest` first here.
self.farthest_record = self.calculate_farthest();

for key in removed_keys.iter() {
// Deletion from disk will be undertaken as a spawned task,
// hence safe to call this function repeatedly here.
self.remove(key);
}

info!("Cleaned up {} unrelevant records, among the original {accumulated_records} accumulated_records",
removed_keys.len());
}
}

impl NodeRecordStore {
Expand Down
9 changes: 9 additions & 0 deletions sn_networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,13 @@ impl UnifiedRecordStore {
Self::Node(store) => store.mark_as_stored(k, record_type),
};
}

pub(crate) fn cleanup_unrelevant_records(&mut self) {
match self {
Self::Client(_store) => {
warn!("Calling cleanup_unrelevant_records at Client. This should not happen");
}
Self::Node(store) => store.cleanup_unrelevant_records(),
}
}
}
14 changes: 14 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const FORWARDED_BALANCE_FILE_NAME: &str = "forwarded_balance";
/// Interval to update the nodes uptime metric
const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);

/// Interval to clean up unrelevant records
const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);

/// Helper to build and run a Node
pub struct NodeBuilder {
keypair: Keypair,
Expand Down Expand Up @@ -323,6 +326,10 @@ impl Node {
tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately

let mut unrelevant_records_cleanup_interval =
tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL);
let _ = unrelevant_records_cleanup_interval.tick().await; // first tick completes immediately

loop {
let peers_connected = &peers_connected;

Expand Down Expand Up @@ -410,6 +417,13 @@ impl Node {
let _ = node_metrics.uptime.set(node_metrics.started_instant.elapsed().as_secs() as i64);
}
}
_ = unrelevant_records_cleanup_interval.tick() => {
let network = self.network().clone();

let _handle = spawn(async move {
Self::trigger_unrelevant_record_cleanup(network);
});
}
}
}
});
Expand Down
5 changes: 5 additions & 0 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ impl Node {
network.trigger_interval_replication()
}

/// Cleanup unrelevant records if accumulated too many.
pub(crate) fn trigger_unrelevant_record_cleanup(network: Network) {
network.trigger_unrelevant_record_cleanup()
}

/// Get the Record from a peer or from the network without waiting.
pub(crate) fn fetch_replication_keys_without_wait(
&self,
Expand Down

0 comments on commit 474d466

Please sign in to comment.