diff --git a/scuttlebutt-test/src/lib.rs b/scuttlebutt-test/src/lib.rs index f9ccfe6..3a05802 100644 --- a/scuttlebutt-test/src/lib.rs +++ b/scuttlebutt-test/src/lib.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] pub struct ApiResponse { + pub cluster_name: String, pub cluster_state: SerializableClusterState, pub live_nodes: Vec, pub dead_nodes: Vec, diff --git a/scuttlebutt-test/src/main.rs b/scuttlebutt-test/src/main.rs index 9ba9ba4..ef8c58a 100644 --- a/scuttlebutt-test/src/main.rs +++ b/scuttlebutt-test/src/main.rs @@ -40,6 +40,7 @@ impl Api { async fn index(&self) -> Json { let scuttlebutt_guard = self.scuttlebutt.lock().await; let response = ApiResponse { + cluster_name: scuttlebutt_guard.cluster_name().to_string(), cluster_state: SerializableClusterState::from(scuttlebutt_guard.cluster_state()), live_nodes: scuttlebutt_guard.live_nodes().cloned().collect::>(), dead_nodes: scuttlebutt_guard.dead_nodes().cloned().collect::>(), @@ -67,6 +68,7 @@ async fn main() -> Result<(), std::io::Error> { NodeId::from(opt.listen_addr.as_str()), &opt.seeds[..], &opt.listen_addr, + "testing".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); diff --git a/scuttlebutt-test/tests/cli.rs b/scuttlebutt-test/tests/cli.rs index 6ab18f5..8661c64 100644 --- a/scuttlebutt-test/tests/cli.rs +++ b/scuttlebutt-test/tests/cli.rs @@ -78,6 +78,7 @@ fn test_multiple_nodes() -> anyhow::Result<()> { .node_states .get("localhost:10003") .is_some()); + assert_eq!(info.cluster_name, "testing"); assert_eq!(info.live_nodes.len(), 4); assert_eq!(info.dead_nodes.len(), 0); diff --git a/scuttlebutt/src/digest.rs b/scuttlebutt/src/digest.rs index 8975992..c672b47 100644 --- a/scuttlebutt/src/digest.rs +++ b/scuttlebutt/src/digest.rs @@ -27,7 +27,7 @@ use crate::{NodeId, Version}; /// /// It is equivalent to a map /// peer -> max version. -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct Digest { pub(crate) node_max_version: BTreeMap, } diff --git a/scuttlebutt/src/lib.rs b/scuttlebutt/src/lib.rs index f4102e3..3249e76 100644 --- a/scuttlebutt/src/lib.rs +++ b/scuttlebutt/src/lib.rs @@ -34,7 +34,7 @@ pub use failure_detector::FailureDetectorConfig; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::digest::Digest; use crate::message::ScuttleButtMessage; @@ -120,6 +120,7 @@ pub struct ScuttleButt { mtu: usize, address: String, self_node_id: NodeId, + cluster_name: String, cluster_state: ClusterState, heartbeat: u64, /// The failure detector instance. @@ -135,6 +136,7 @@ impl ScuttleButt { self_node_id: NodeId, seed_ids: HashSet, address: String, + cluster_name: String, initial_key_values: Vec<(impl ToString, impl ToString)>, failure_detector_config: FailureDetectorConfig, ) -> Self { @@ -143,6 +145,7 @@ impl ScuttleButt { mtu: 60_000, address, self_node_id, + cluster_name, cluster_state: ClusterState::with_seed_ids(seed_ids), heartbeat: 0, failure_detector: FailureDetector::new(failure_detector_config), @@ -169,12 +172,26 @@ impl ScuttleButt { pub fn create_syn_message(&mut self) -> ScuttleButtMessage { let digest = self.compute_digest(); - ScuttleButtMessage::Syn { digest } + ScuttleButtMessage::Syn { + cluster_name: self.cluster_name.clone(), + digest, + } } pub fn process_message(&mut self, msg: ScuttleButtMessage) -> Option { match msg { - ScuttleButtMessage::Syn { digest } => { + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { + if cluster_name != self.cluster_name { + warn!( + cluster_name = %cluster_name, + "rejecting syn message with mismatching cluster name" + ); + return Some(ScuttleButtMessage::BadCluster); + } + let self_digest = self.compute_digest(); let dead_nodes = self.dead_nodes().collect::>(); let delta = self.cluster_state.compute_delta( @@ -202,6 +219,10 @@ impl ScuttleButt { self.cluster_state.apply_delta(delta); None } + ScuttleButtMessage::BadCluster => { + warn!("message rejected by peer: cluster name mismatch"); + None + } } } @@ -275,6 +296,10 @@ impl ScuttleButt { &self.self_node_id } + pub fn cluster_name(&self) -> &str { + &self.cluster_name + } + /// Computes digest. /// /// This method also increments the heartbeat, to force the presence @@ -351,6 +376,7 @@ mod tests { NodeId::from(address.as_str()), seeds, address, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig { dead_node_grace_period: DEAD_NODE_GRACE_PERIOD, @@ -417,6 +443,7 @@ mod tests { NodeId::from("node1"), HashSet::new(), "node1".to_string(), + "test-cluster".to_string(), vec![("key1a", "1"), ("key2a", "2")], FailureDetectorConfig::default(), ); @@ -424,6 +451,7 @@ mod tests { NodeId::from("node2"), HashSet::new(), "node2".to_string(), + "test-cluster".to_string(), vec![("key1b", "1"), ("key2b", "2")], FailureDetectorConfig::default(), ); @@ -570,6 +598,7 @@ mod tests { NodeId::new("new_node".to_string(), address.clone()), &["localhost:40001".to_string()], address, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); diff --git a/scuttlebutt/src/message.rs b/scuttlebutt/src/message.rs index 4e444e2..0c98fb9 100644 --- a/scuttlebutt/src/message.rs +++ b/scuttlebutt/src/message.rs @@ -31,16 +31,22 @@ use crate::serialize::Serializable; /// between node A and node B. /// The names {Syn, SynAck, Ack} of the different steps are borrowed from /// TCP Handshake. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum ScuttleButtMessage { /// Node A initiates handshakes. - Syn { digest: Digest }, + Syn { + cluster_name: String, + digest: Digest, + }, /// Node B returns a partial update as described /// in the scuttlebutt reconcialiation algorithm, /// and returns its own checksum. SynAck { digest: Digest, delta: Delta }, /// Node A returns a partial update for B. Ack { delta: Delta }, + /// Node B rejects the Syn message because of a + /// cluster name mismatch between the peers. + BadCluster, } #[derive(Copy, Clone)] @@ -49,6 +55,7 @@ enum MessageType { Syn = 0, SynAck = 1u8, Ack = 2u8, + BadCluster = 3u8, } impl MessageType { @@ -57,6 +64,7 @@ impl MessageType { 0 => Some(Self::Syn), 1 => Some(Self::SynAck), 2 => Some(Self::Ack), + 3 => Some(Self::BadCluster), _ => None, } } @@ -68,9 +76,13 @@ impl MessageType { impl Serializable for ScuttleButtMessage { fn serialize(&self, buf: &mut Vec) { match self { - ScuttleButtMessage::Syn { digest } => { + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { buf.push(MessageType::Syn.to_code()); digest.serialize(buf); + cluster_name.serialize(buf); } ScuttleButtMessage::SynAck { digest, delta } => { buf.push(MessageType::SynAck.to_code()); @@ -81,6 +93,9 @@ impl Serializable for ScuttleButtMessage { buf.push(MessageType::Ack.to_code()); delta.serialize(buf); } + ScuttleButtMessage::BadCluster => { + buf.push(MessageType::BadCluster.to_code()); + } } } @@ -94,7 +109,11 @@ impl Serializable for ScuttleButtMessage { match code { MessageType::Syn => { let digest = Digest::deserialize(buf)?; - Ok(Self::Syn { digest }) + let cluster_name = String::deserialize(buf)?; + Ok(Self::Syn { + cluster_name, + digest, + }) } MessageType::SynAck => { let digest = Digest::deserialize(buf)?; @@ -105,16 +124,44 @@ impl Serializable for ScuttleButtMessage { let delta = Delta::deserialize(buf)?; Ok(Self::Ack { delta }) } + MessageType::BadCluster => Ok(Self::BadCluster), } } fn serialized_len(&self) -> usize { match self { - ScuttleButtMessage::Syn { digest } => 1 + digest.serialized_len(), + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => 1 + cluster_name.serialized_len() + digest.serialized_len(), ScuttleButtMessage::SynAck { digest, delta } => { 1 + digest.serialized_len() + delta.serialized_len() } ScuttleButtMessage::Ack { delta } => 1 + delta.serialized_len(), + ScuttleButtMessage::BadCluster => 1, } } } + +#[cfg(test)] +mod tests { + use crate::serialize::test_serdeser_aux; + use crate::{Digest, ScuttleButtMessage}; + + #[test] + fn test_syn() { + let mut digest = Digest::default(); + digest.add_node("node1".into(), 1); + digest.add_node("node2".into(), 2); + let syn = ScuttleButtMessage::Syn { + cluster_name: "cluster-a".to_string(), + digest, + }; + test_serdeser_aux(&syn, 58); + } + + #[test] + fn test_bad_cluster() { + test_serdeser_aux(&ScuttleButtMessage::BadCluster, 1); + } +} diff --git a/scuttlebutt/src/server.rs b/scuttlebutt/src/server.rs index 377302e..3d04a85 100644 --- a/scuttlebutt/src/server.rs +++ b/scuttlebutt/src/server.rs @@ -63,6 +63,7 @@ impl ScuttleServer { node_id: NodeId, seed_nodes: &[String], address: impl Into, + cluster_name: String, initial_key_values: Vec<(impl ToString, impl ToString)>, failure_detector_config: FailureDetectorConfig, ) -> Self { @@ -72,6 +73,7 @@ impl ScuttleServer { node_id, seed_nodes.iter().cloned().collect(), address.into(), + cluster_name, initial_key_values, failure_detector_config, ); @@ -397,6 +399,7 @@ mod tests { "0.0.0.0:1112".into(), &[], "0.0.0.0:1112", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -407,7 +410,13 @@ mod tests { let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); match msg { - ScuttleButtMessage::Syn { .. } => (), + ScuttleButtMessage::Syn { + cluster_name, + digest, + } => { + assert_eq!(cluster_name, "test-cluster"); + assert_eq!(digest.node_max_version.len(), 1); + } message => panic!("unexpected message: {:?}", message), } @@ -422,6 +431,7 @@ mod tests { "offline".into(), HashSet::new(), "offline".to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -430,6 +440,7 @@ mod tests { server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -452,38 +463,39 @@ mod tests { } #[tokio::test] - async fn ignore_broken_payload() { - let server_addr = "0.0.0.0:3331"; + async fn syn_bad_cluster() { + let outsider_addr = "0.0.0.0:2224"; + let socket = UdpSocket::bind(outsider_addr).await.unwrap(); + let mut outsider = ScuttleButt::with_node_id_and_seeds( + outsider_addr.into(), + HashSet::new(), + outsider_addr.into(), + "another-cluster".to_string(), + Vec::<(&str, &str)>::new(), + FailureDetectorConfig::default(), + ); + + let server_addr = "0.0.0.0:2223"; let server = ScuttleServer::spawn( server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); - let socket = UdpSocket::bind("0.0.0.0:3332").await.unwrap(); - let mut scuttlebutt = ScuttleButt::with_node_id_and_seeds( - "offline".into(), - HashSet::new(), - "offline".to_string(), - Vec::<(&str, &str)>::new(), - FailureDetectorConfig::default(), - ); - - // Send broken payload. - socket.send_to(b"broken", server_addr).await.unwrap(); - // Confirm nothing broke using a regular payload. - let syn = scuttlebutt.create_syn_message(); - let message = syn.serialize_to_vec(); - socket.send_to(&message, server_addr).await.unwrap(); + let mut buf = Vec::new(); + let syn = outsider.create_syn_message(); + syn.serialize(&mut buf); + socket.send_to(&buf[..], server_addr).await.unwrap(); - let mut buf = [0; UDP_MTU]; + let mut buf = [0; super::UDP_MTU]; let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap(); let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); match msg { - ScuttleButtMessage::SynAck { .. } => (), + ScuttleButtMessage::BadCluster => (), message => panic!("unexpected message: {:?}", message), } @@ -491,36 +503,39 @@ mod tests { } #[tokio::test] - async fn ignore_oversized_payload() { - let server_addr = "0.0.0.0:4441"; + async fn ignore_broken_payload() { + let server_addr = "0.0.0.0:3331"; let server = ScuttleServer::spawn( server_addr.into(), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); - let socket = UdpSocket::bind("0.0.0.0:4442").await.unwrap(); + let socket = UdpSocket::bind("0.0.0.0:3332").await.unwrap(); let mut scuttlebutt = ScuttleButt::with_node_id_and_seeds( "offline".into(), HashSet::new(), "offline".to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); // Send broken payload. - socket.send_to(&[0; UDP_MTU], server_addr).await.unwrap(); + socket.send_to(b"broken", server_addr).await.unwrap(); // Confirm nothing broke using a regular payload. let syn = scuttlebutt.create_syn_message(); - let buf = syn.serialize_to_vec(); - socket.send_to(&buf[..], server_addr).await.unwrap(); + let message = syn.serialize_to_vec(); + socket.send_to(&message, server_addr).await.unwrap(); let mut buf = [0; UDP_MTU]; let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap(); - match ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap() { + let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); + match msg { ScuttleButtMessage::SynAck { .. } => (), message => panic!("unexpected message: {:?}", message), } @@ -537,6 +552,7 @@ mod tests { "0.0.0.0:5552".into(), &[server_addr.into()], "0.0.0.0:5552", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -559,6 +575,7 @@ mod tests { test_addr.into(), HashSet::new(), test_addr.to_string(), + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -570,6 +587,7 @@ mod tests { NodeId::from(server_addr), &[], server_addr, + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -617,6 +635,7 @@ mod tests { NodeId::from("0.0.0.0:6663"), &[], "0.0.0.0:6663", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); @@ -624,6 +643,7 @@ mod tests { NodeId::from("0.0.0.0:6664"), &["0.0.0.0:6663".to_string()], "0.0.0.0:6664", + "test-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), );