From 6943a04a3e3f781343cca18b85825a5e8ae71fad Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 12 Apr 2022 04:24:21 +0200 Subject: [PATCH] Add cluster_name property to avoid several clusters merging accidentally (#36) * add cluster_name field to ScuttleButt * ensure cluster name when accepting syn messages * return BadCluster on cluster name mismatch * remove invalid ignore_oversized_payload test Introduced in #4, this test has been outdated since the reception buffer sized has been increased to the full UDP MTU. As is, it sends an empty Syn message before sending another valid Syn message. The assert checks for an SynAck message, that is the response to the first packet instead of the second. Now that peers check the cluster name property of Syn messages, the first packet gets a BadCluster response, which fails the assert. --- scuttlebutt-test/src/lib.rs | 1 + scuttlebutt-test/src/main.rs | 2 + scuttlebutt-test/tests/cli.rs | 1 + scuttlebutt/src/digest.rs | 2 +- scuttlebutt/src/lib.rs | 35 +++++++++++++++-- scuttlebutt/src/message.rs | 57 ++++++++++++++++++++++++--- scuttlebutt/src/server.rs | 74 ++++++++++++++++++++++------------- 7 files changed, 136 insertions(+), 36 deletions(-) diff --git a/scuttlebutt-test/src/lib.rs b/scuttlebutt-test/src/lib.rs index f9ccfe6..3a05802 100644 --- a/scuttlebutt-test/src/lib.rs +++ b/scuttlebutt-test/src/lib.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] pub struct ApiResponse { + pub cluster_name: String, pub cluster_state: SerializableClusterState, pub live_nodes: Vec, pub dead_nodes: Vec, diff --git a/scuttlebutt-test/src/main.rs b/scuttlebutt-test/src/main.rs index 9ba9ba4..ef8c58a 100644 --- a/scuttlebutt-test/src/main.rs +++ b/scuttlebutt-test/src/main.rs @@ -40,6 +40,7 @@ impl Api { async fn index(&self) -> Json { let scuttlebutt_guard = self.scuttlebutt.lock().await; let response = ApiResponse { + cluster_name: scuttlebutt_guard.cluster_name().to_string(), cluster_state: SerializableClusterState::from(scuttlebutt_guard.cluster_state()), live_nodes: scuttlebutt_guard.live_nodes().cloned().collect::>(), dead_nodes: scuttlebutt_guard.dead_nodes().cloned().collect::>(), @@ -67,6 +68,7 @@ async fn main() -> Result<(), std::io::Error> { NodeId::from(opt.listen_addr.as_str()), &opt.seeds[..], &opt.listen_addr, + "testing".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); diff --git a/scuttlebutt-test/tests/cli.rs b/scuttlebutt-test/tests/cli.rs index 6ab18f5..8661c64 100644 --- a/scuttlebutt-test/tests/cli.rs +++ b/scuttlebutt-test/tests/cli.rs @@ -78,6 +78,7 @@ fn test_multiple_nodes() -> anyhow::Result<()> { .node_states .get("localhost:10003") .is_some()); + assert_eq!(info.cluster_name, "testing"); assert_eq!(info.live_nodes.len(), 4); assert_eq!(info.dead_nodes.len(), 0); diff --git a/scuttlebutt/src/digest.rs b/scuttlebutt/src/digest.rs index 8975992..c672b47 100644 --- a/scuttlebutt/src/digest.rs +++ b/scuttlebutt/src/digest.rs @@ -27,7 +27,7 @@ use crate::{NodeId, Version}; /// /// It is equivalent to a map /// peer -> max version. -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct Digest { pub(crate) node_max_version: BTreeMap, } diff --git a/scuttlebutt/src/lib.rs b/scuttlebutt/src/lib.rs index f4102e3..3249e76 100644 --- a/scuttlebutt/src/lib.rs +++ b/scuttlebutt/src/lib.rs @@ -34,7 +34,7 @@ pub use failure_detector::FailureDetectorConfig; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::digest::Digest; use crate::message::ScuttleButtMessage; @@ -120,6 +120,7 @@ pub struct ScuttleButt { mtu: usize, address: String, self_node_id: NodeId, + cluster_name: String, cluster_state: ClusterState, heartbeat: u64, /// The failure detector instance. @@ -135,6 +136,7 @@ impl ScuttleButt { self_node_id: NodeId, seed_ids: HashSet, address: String, + cluster_name: String, initial_key_values: Vec<(impl ToString, impl ToString)>, failure_detector_config: FailureDetectorConfig, ) -> Self { @@ -143,6 +145,7 @@ impl ScuttleButt { mtu: 60_000, address, self_node_id, + cluster_name, cluster_state: ClusterState::with_seed_ids(seed_ids), heartbeat: 0, failure_detector: FailureDetector::new(failure_detector_config), @@ -169,12 +172,26 @@ impl ScuttleButt { pub fn create_syn_message(&mut self) -> ScuttleButtMessage { let digest = self.compute_digest(); - ScuttleButtMessage::Syn { digest } + ScuttleButtMessage::Syn { + cluster_name: self.cluster_name.clone(), + digest, + } } pub fn process_message(&mut self, msg: ScuttleButtMessage) -> Option { match msg { - ScuttleButtMessage::Syn { digest } => { + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { + if cluster_name != self.cluster_name { + warn!( + cluster_name = %cluster_name, + "rejecting syn message with mismatching cluster name" + ); + return Some(ScuttleButtMessage::BadCluster); + } + let self_digest = self.compute_digest(); let dead_nodes = self.dead_nodes().collect::>(); let delta = self.cluster_state.compute_delta( @@ -202,6 +219,10 @@ impl ScuttleButt { self.cluster_state.apply_delta(delta); None } + ScuttleButtMessage::BadCluster => { + warn!("message rejected by peer: cluster name mismatch"); + None + } } } @@ -275,6 +296,10 @@ impl ScuttleButt { &self.self_node_id } + pub fn cluster_name(&self) -> &str { + &self.cluster_name + } + /// Computes digest. /// /// This method also increments the heartbeat, to force the presence @@ -351,6 +376,7 @@ mod tests { NodeId::from(address.as_str()), seeds, address, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig { dead_node_grace_period: DEAD_NODE_GRACE_PERIOD, @@ -417,6 +443,7 @@ mod tests { NodeId::from("node1"), HashSet::new(), "node1".to_string(), + "test-cluster".to_string(), vec![("key1a", "1"), ("key2a", "2")], FailureDetectorConfig::default(), ); @@ -424,6 +451,7 @@ mod tests { NodeId::from("node2"), HashSet::new(), "node2".to_string(), + "test-cluster".to_string(), vec![("key1b", "1"), ("key2b", "2")], FailureDetectorConfig::default(), ); @@ -570,6 +598,7 @@ mod tests { NodeId::new("new_node".to_string(), address.clone()), &["localhost:40001".to_string()], address, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); diff --git a/scuttlebutt/src/message.rs b/scuttlebutt/src/message.rs index 4e444e2..0c98fb9 100644 --- a/scuttlebutt/src/message.rs +++ b/scuttlebutt/src/message.rs @@ -31,16 +31,22 @@ use crate::serialize::Serializable; /// between node A and node B. /// The names {Syn, SynAck, Ack} of the different steps are borrowed from /// TCP Handshake. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum ScuttleButtMessage { /// Node A initiates handshakes. - Syn { digest: Digest }, + Syn { + cluster_name: String, + digest: Digest, + }, /// Node B returns a partial update as described /// in the scuttlebutt reconcialiation algorithm, /// and returns its own checksum. SynAck { digest: Digest, delta: Delta }, /// Node A returns a partial update for B. Ack { delta: Delta }, + /// Node B rejects the Syn message because of a + /// cluster name mismatch between the peers. + BadCluster, } #[derive(Copy, Clone)] @@ -49,6 +55,7 @@ enum MessageType { Syn = 0, SynAck = 1u8, Ack = 2u8, + BadCluster = 3u8, } impl MessageType { @@ -57,6 +64,7 @@ impl MessageType { 0 => Some(Self::Syn), 1 => Some(Self::SynAck), 2 => Some(Self::Ack), + 3 => Some(Self::BadCluster), _ => None, } } @@ -68,9 +76,13 @@ impl MessageType { impl Serializable for ScuttleButtMessage { fn serialize(&self, buf: &mut Vec) { match self { - ScuttleButtMessage::Syn { digest } => { + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { buf.push(MessageType::Syn.to_code()); digest.serialize(buf); + cluster_name.serialize(buf); } ScuttleButtMessage::SynAck { digest, delta } => { buf.push(MessageType::SynAck.to_code()); @@ -81,6 +93,9 @@ impl Serializable for ScuttleButtMessage { buf.push(MessageType::Ack.to_code()); delta.serialize(buf); } + ScuttleButtMessage::BadCluster => { + buf.push(MessageType::BadCluster.to_code()); + } } } @@ -94,7 +109,11 @@ impl Serializable for ScuttleButtMessage { match code { MessageType::Syn => { let digest = Digest::deserialize(buf)?; - Ok(Self::Syn { digest }) + let cluster_name = String::deserialize(buf)?; + Ok(Self::Syn { + cluster_name, + digest, + }) } MessageType::SynAck => { let digest = Digest::deserialize(buf)?; @@ -105,16 +124,44 @@ impl Serializable for ScuttleButtMessage { let delta = Delta::deserialize(buf)?; Ok(Self::Ack { delta }) } + MessageType::BadCluster => Ok(Self::BadCluster), } } fn serialized_len(&self) -> usize { match self { - ScuttleButtMessage::Syn { digest } => 1 + digest.serialized_len(), + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => 1 + cluster_name.serialized_len() + digest.serialized_len(), ScuttleButtMessage::SynAck { digest, delta } => { 1 + digest.serialized_len() + delta.serialized_len() } ScuttleButtMessage::Ack { delta } => 1 + delta.serialized_len(), + ScuttleButtMessage::BadCluster => 1, } } } + +#[cfg(test)] +mod tests { + use crate::serialize::test_serdeser_aux; + use crate::{Digest, ScuttleButtMessage}; + + #[test] + fn test_syn() { + let mut digest = Digest::default(); + digest.add_node("node1".into(), 1); + digest.add_node("node2".into(), 2); + let syn = ScuttleButtMessage::Syn { + cluster_name: "cluster-a".to_string(), + digest, + }; + test_serdeser_aux(&syn, 58); + } + + #[test] + fn test_bad_cluster() { + test_serdeser_aux(&ScuttleButtMessage::BadCluster, 1); + } +} diff --git a/scuttlebutt/src/server.rs b/scuttlebutt/src/server.rs index 377302e..3d04a85 100644 --- a/scuttlebutt/src/server.rs +++ b/scuttlebutt/src/server.rs @@ -63,6 +63,7 @@ impl ScuttleServer { node_id: NodeId, seed_nodes: &[String], address: impl Into, + cluster_name: String, initial_key_values: Vec<(impl ToString, impl ToString)>, failure_detector_config: FailureDetectorConfig, ) -> Self { @@ -72,6 +73,7 @@ impl ScuttleServer { node_id, seed_nodes.iter().cloned().collect(), address.into(), + cluster_name, initial_key_values, failure_detector_config, ); @@ -397,6 +399,7 @@ mod tests { "0.0.0.0:1112".into(), &[], "0.0.0.0:1112", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -407,7 +410,13 @@ mod tests { let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); match msg { - ScuttleButtMessage::Syn { .. } => (), + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { + assert_eq!(cluster_name, "test-cluster"); + assert_eq!(digest.node_max_version.len(), 1); + } message => panic!("unexpected message: {:?}", message), } @@ -422,6 +431,7 @@ mod tests { "offline".into(), HashSet::new(), "offline".to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -430,6 +440,7 @@ mod tests { server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -452,38 +463,39 @@ mod tests { } #[tokio::test] - async fn ignore_broken_payload() { - let server_addr = "0.0.0.0:3331"; + async fn syn_bad_cluster() { + let outsider_addr = "0.0.0.0:2224"; + let socket = UdpSocket::bind(outsider_addr).await.unwrap(); + let mut outsider = ScuttleButt::with_node_id_and_seeds( + outsider_addr.into(), + HashSet::new(), + outsider_addr.into(), + "another-cluster".to_string(), + Vec::<(&str, &str)>::new(), + FailureDetectorConfig::default(), + ); + + let server_addr = "0.0.0.0:2223"; let server = ScuttleServer::spawn( server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); - let socket = UdpSocket::bind("0.0.0.0:3332").await.unwrap(); - let mut scuttlebutt = ScuttleButt::with_node_id_and_seeds( - "offline".into(), - HashSet::new(), - "offline".to_string(), - Vec::<(&str, &str)>::new(), - FailureDetectorConfig::default(), - ); - - // Send broken payload. - socket.send_to(b"broken", server_addr).await.unwrap(); - // Confirm nothing broke using a regular payload. - let syn = scuttlebutt.create_syn_message(); - let message = syn.serialize_to_vec(); - socket.send_to(&message, server_addr).await.unwrap(); + let mut buf = Vec::new(); + let syn = outsider.create_syn_message(); + syn.serialize(&mut buf); + socket.send_to(&buf[..], server_addr).await.unwrap(); - let mut buf = [0; UDP_MTU]; + let mut buf = [0; super::UDP_MTU]; let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap(); let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); match msg { - ScuttleButtMessage::SynAck { .. } => (), + ScuttleButtMessage::BadCluster => (), message => panic!("unexpected message: {:?}", message), } @@ -491,36 +503,39 @@ mod tests { } #[tokio::test] - async fn ignore_oversized_payload() { - let server_addr = "0.0.0.0:4441"; + async fn ignore_broken_payload() { + let server_addr = "0.0.0.0:3331"; let server = ScuttleServer::spawn( server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); - let socket = UdpSocket::bind("0.0.0.0:4442").await.unwrap(); + let socket = UdpSocket::bind("0.0.0.0:3332").await.unwrap(); let mut scuttlebutt = ScuttleButt::with_node_id_and_seeds( "offline".into(), HashSet::new(), "offline".to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); // Send broken payload. - socket.send_to(&[0; UDP_MTU], server_addr).await.unwrap(); + socket.send_to(b"broken", server_addr).await.unwrap(); // Confirm nothing broke using a regular payload. let syn = scuttlebutt.create_syn_message(); - let buf = syn.serialize_to_vec(); - socket.send_to(&buf[..], server_addr).await.unwrap(); + let message = syn.serialize_to_vec(); + socket.send_to(&message, server_addr).await.unwrap(); let mut buf = [0; UDP_MTU]; let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap(); - match ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap() { + let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); + match msg { ScuttleButtMessage::SynAck { .. } => (), message => panic!("unexpected message: {:?}", message), } @@ -537,6 +552,7 @@ mod tests { "0.0.0.0:5552".into(), &[server_addr.into()], "0.0.0.0:5552", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -559,6 +575,7 @@ mod tests { test_addr.into(), HashSet::new(), test_addr.to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -570,6 +587,7 @@ mod tests { NodeId::from(server_addr), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -617,6 +635,7 @@ mod tests { NodeId::from("0.0.0.0:6663"), &[], "0.0.0.0:6663", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -624,6 +643,7 @@ mod tests { NodeId::from("0.0.0.0:6664"), &["0.0.0.0:6663".to_string()], "0.0.0.0:6664", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), );