Skip to content

Commit

Permalink
Node garbage collection (#33)
Browse files Browse the repository at this point in the history
* implemented node garbage collection

* fix comment from PR
  • Loading branch information
evanxg852000 authored Mar 29, 2022
1 parent fb799bd commit f20cda5
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 16 deletions.
81 changes: 67 additions & 14 deletions scuttlebutt/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use std::time::Duration;
#[cfg(not(test))]
use std::time::Instant;
Expand All @@ -33,30 +32,29 @@ use crate::NodeId;
/// A phi accrual failure detector implementation.
pub struct FailureDetector {
/// Heartbeat samples for each node.
node_samples: RwLock<HashMap<NodeId, SamplingWindow>>,
node_samples: HashMap<NodeId, SamplingWindow>,
/// Failure detector configuration.
config: FailureDetectorConfig,
/// Denotes live nodes.
live_nodes: HashSet<NodeId>,
/// Denotes dead nodes.
dead_nodes: HashSet<NodeId>,
dead_nodes: HashMap<NodeId, Instant>,
}

impl FailureDetector {
pub fn new(config: FailureDetectorConfig) -> Self {
Self {
node_samples: RwLock::new(HashMap::new()),
node_samples: HashMap::new(),
config,
live_nodes: HashSet::new(),
dead_nodes: HashSet::new(),
dead_nodes: HashMap::new(),
}
}

/// Reports node heartbeat.
pub fn report_heartbeat(&mut self, node_id: &NodeId) {
debug!(node_id = ?node_id, "reporting node heartbeat.");
let mut node_samples = self.node_samples.write().unwrap();
let heartbeat_window = node_samples.entry(node_id.clone()).or_insert_with(|| {
let heartbeat_window = self.node_samples.entry(node_id.clone()).or_insert_with(|| {
SamplingWindow::new(
self.config.sampling_window_size,
self.config.max_interval,
Expand All @@ -72,32 +70,45 @@ impl FailureDetector {
debug!(node_id = ?node_id, phi = phi, "updating node liveliness");
if phi > self.config.phi_threshold {
self.live_nodes.remove(node_id);
self.dead_nodes.insert(node_id.clone());
self.dead_nodes.insert(node_id.clone(), Instant::now());
// Remove current sampling window so that when the node
// comes back online, we start with a fresh sampling window.
self.node_samples.write().unwrap().remove(node_id);
self.node_samples.remove(node_id);
} else {
self.live_nodes.insert(node_id.clone());
self.dead_nodes.remove(node_id);
}
}
}

/// Removes and returns the list of garbage collectible nodes.
pub fn garbage_collect(&mut self) -> Vec<NodeId> {
let mut garbage_collected_nodes = Vec::new();
for (node_id, instant) in self.dead_nodes.iter() {
if instant.elapsed() >= self.config.dead_node_grace_period {
garbage_collected_nodes.push(node_id.clone())
}
}

for node_id in garbage_collected_nodes.iter() {
self.dead_nodes.remove(node_id);
}
garbage_collected_nodes
}

/// Returns a list of live nodes.
pub fn live_nodes(&self) -> impl Iterator<Item = &NodeId> {
self.live_nodes.iter() //.map(|node_id| node_id.as_str())
self.live_nodes.iter()
}

/// Returns a list of dead nodes.
pub fn dead_nodes(&self) -> impl Iterator<Item = &NodeId> {
self.dead_nodes.iter() //.map(|node_id| node_id.as_str())
self.dead_nodes.iter().map(|(node_id, _)| node_id)
}

/// Returns the current phi value of a node.
fn phi(&mut self, node_id: &NodeId) -> Option<f64> {
self.node_samples
.read()
.unwrap()
.get(node_id)
.map(|sampling_window| sampling_window.phi())
}
Expand All @@ -114,6 +125,8 @@ pub struct FailureDetectorConfig {
pub max_interval: Duration,
/// Initial interval used on startup when no previous heartbeat exists.
pub initial_interval: Duration,
/// Threshold period after which dead node can be removed from the cluster.
pub dead_node_grace_period: Duration,
}

impl FailureDetectorConfig {
Expand All @@ -122,12 +135,14 @@ impl FailureDetectorConfig {
sampling_window_size: usize,
max_interval: Duration,
initial_interval: Duration,
dead_node_grace_period: Duration,
) -> Self {
Self {
phi_threshold,
sampling_window_size,
max_interval,
initial_interval,
dead_node_grace_period,
}
}
}
Expand All @@ -139,6 +154,7 @@ impl Default for FailureDetectorConfig {
sampling_window_size: 1000,
max_interval: Duration::from_secs(10),
initial_interval: Duration::from_secs(5),
dead_node_grace_period: Duration::from_secs(24 * 60 * 60), // 24 hours
}
}
}
Expand Down Expand Up @@ -288,8 +304,45 @@ mod tests {
.map(|node_id| node_id.id.as_str())
.collect::<Vec<_>>();
live_nodes.sort_unstable();

assert_eq!(live_nodes, vec!["node-1", "node-2", "node-3"]);
assert_eq!(failure_detector.garbage_collect(), vec![]);

// stop reporting heartbeat for few seconds
MockClock::advance(Duration::from_secs(50));
for node_id in &node_ids_choices {
failure_detector.update_node_liveliness(node_id);
}
let mut dead_nodes = failure_detector
.dead_nodes()
.map(|node_id| node_id.id.as_str())
.collect::<Vec<_>>();
dead_nodes.sort_unstable();
assert_eq!(dead_nodes, vec!["node-1", "node-2", "node-3"]);
assert_eq!(failure_detector.garbage_collect(), vec![]);

// Wait for dead_node_grace_period & garbage collect.
MockClock::advance(Duration::from_secs(25 * 60 * 60));
let garbage_collected_nodes = failure_detector.garbage_collect();
assert_eq!(
failure_detector
.live_nodes()
.map(|node_id| node_id.id.as_str())
.collect::<Vec<_>>(),
Vec::<&str>::new()
);
assert_eq!(
failure_detector
.dead_nodes()
.map(|node_id| node_id.id.as_str())
.collect::<Vec<_>>(),
Vec::<&str>::new()
);
let mut removed_nodes = garbage_collected_nodes
.iter()
.map(|node_id| node_id.id.as_str())
.collect::<Vec<_>>();
removed_nodes.sort_unstable();
assert_eq!(removed_nodes, vec!["node-1", "node-2", "node-3"]);
}

#[test]
Expand Down
83 changes: 81 additions & 2 deletions scuttlebutt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ impl ScuttleButt {
error!(current_node = ?self.self_node_id, "error while reporting membership change event.")
}
}

// Perform garbage collection.
let garbage_collected_nodes = self.failure_detector.garbage_collect();
for node_id in garbage_collected_nodes.iter() {
self.cluster_state.remove_node(node_id)
}
}

pub fn node_state(&self, node_id: &NodeId) -> Option<&NodeState> {
Expand Down Expand Up @@ -295,7 +301,7 @@ impl ScuttleButt {

#[cfg(test)]
mod tests {
use std::ops::RangeInclusive;
use std::ops::{Add, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -308,6 +314,8 @@ mod tests {
use super::*;
use crate::server::ScuttleServer;

const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(25);

fn run_scuttlebutt_handshake(initiating_node: &mut ScuttleButt, peer_node: &mut ScuttleButt) {
let syn_message = initiating_node.create_syn_message();
let syn_ack_message = peer_node.process_message(syn_message).unwrap();
Expand Down Expand Up @@ -344,7 +352,10 @@ mod tests {
seeds,
address,
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
FailureDetectorConfig {
dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
..Default::default()
},
)
}

Expand Down Expand Up @@ -598,4 +609,72 @@ mod tests {
shutdown_nodes(nodes).await?;
Ok(())
}

#[tokio::test]
async fn test_dead_node_garbage_collection() -> anyhow::Result<()> {
let mut nodes = setup_nodes(60001..=60006).await;

let (id, node) = nodes.get(1).unwrap();
assert_eq!(id, "60002");
wait_for_scuttlebutt_state(
node.scuttlebutt(),
5,
&[
NodeId::from("localhost:60001"),
NodeId::from("localhost:60003"),
NodeId::from("localhost:60004"),
NodeId::from("localhost:60005"),
NodeId::from("localhost:60006"),
],
)
.await;

// Take down node at localhost:60003
let (id, node) = nodes.remove(2);
assert_eq!(id, "60003");
node.shutdown().await.unwrap();

let (id, node) = nodes.get(1).unwrap();
assert_eq!(id, "60002");
wait_for_scuttlebutt_state(
node.scuttlebutt(),
4,
&[
NodeId::from("localhost:60001"),
NodeId::from("localhost:60004"),
NodeId::from("localhost:60005"),
NodeId::from("localhost:60006"),
],
)
.await;

// Dead node should still be known to the cluster.
let dead_node_id = NodeId::from("localhost:60003");
for (_, node) in nodes.iter() {
assert!(node
.scuttlebutt()
.lock()
.await
.node_state(&dead_node_id)
.is_some());
}

// Wait a bit more than `dead_node_grace_period` since all nodes will not
// notice cluster change at the same time.
let wait_for = DEAD_NODE_GRACE_PERIOD.add(Duration::from_secs(15));
time::sleep(wait_for).await;

// Dead node should no longer be known to the cluster.
for (_, node) in nodes.iter() {
assert!(node
.scuttlebutt()
.lock()
.await
.node_state(&dead_node_id)
.is_none());
}

shutdown_nodes(nodes).await?;
Ok(())
}
}
4 changes: 4 additions & 0 deletions scuttlebutt/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ impl ClusterState {
self.seed_nodes.iter().map(|node_id| node_id.as_str())
}

pub(crate) fn remove_node(&mut self, node_id: &NodeId) {
self.node_states.remove(node_id);
}

pub fn apply_delta(&mut self, delta: Delta) {
for (node_id, node_delta) in delta.node_deltas {
let mut node_state_map = self
Expand Down

0 comments on commit f20cda5

Please sign in to comment.