diff --git a/scuttlebutt/src/digest.rs b/scuttlebutt/src/digest.rs index 8975992..c672b47 100644 --- a/scuttlebutt/src/digest.rs +++ b/scuttlebutt/src/digest.rs @@ -27,7 +27,7 @@ use crate::{NodeId, Version}; /// /// It is equivalent to a map /// peer -> max version. -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct Digest { pub(crate) node_max_version: BTreeMap, } diff --git a/scuttlebutt/src/lib.rs b/scuttlebutt/src/lib.rs index 6b8aae7..f08e60a 100644 --- a/scuttlebutt/src/lib.rs +++ b/scuttlebutt/src/lib.rs @@ -187,9 +187,9 @@ impl ScuttleButt { if cluster_name != self.cluster_name { warn!( cluster_name = ?cluster_name, - "ignoring syn message with mismatching cluster name" + "rejecting syn message with mismatching cluster name" ); - return None; + return Some(ScuttleButtMessage::BadCluster); } let self_digest = self.compute_digest(); @@ -219,6 +219,10 @@ impl ScuttleButt { self.cluster_state.apply_delta(delta); None } + ScuttleButtMessage::BadCluster => { + warn!("message rejected by peer: cluster name mismatch"); + return None; + } } } diff --git a/scuttlebutt/src/message.rs b/scuttlebutt/src/message.rs index e4504f4..bf3a35f 100644 --- a/scuttlebutt/src/message.rs +++ b/scuttlebutt/src/message.rs @@ -31,7 +31,7 @@ use crate::serialize::Serializable; /// between node A and node B. /// The names {Syn, SynAck, Ack} of the different steps are borrowed from /// TCP Handshake. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum ScuttleButtMessage { /// Node A initiates handshakes. Syn { @@ -44,6 +44,9 @@ pub enum ScuttleButtMessage { SynAck { digest: Digest, delta: Delta }, /// Node A returns a partial update for B. Ack { delta: Delta }, + /// Node B rejects the Syn message because of a + /// cluster name mismatch between the peers. + BadCluster, } #[derive(Copy, Clone)] @@ -52,6 +55,7 @@ enum MessageType { Syn = 0, SynAck = 1u8, Ack = 2u8, + BadCluster = 3u8, } impl MessageType { @@ -60,6 +64,7 @@ impl MessageType { 0 => Some(Self::Syn), 1 => Some(Self::SynAck), 2 => Some(Self::Ack), + 3 => Some(Self::BadCluster), _ => None, } } @@ -88,6 +93,9 @@ impl Serializable for ScuttleButtMessage { buf.push(MessageType::Ack.to_code()); delta.serialize(buf); } + ScuttleButtMessage::BadCluster => { + buf.push(MessageType::BadCluster.to_code()); + } } } @@ -116,6 +124,7 @@ impl Serializable for ScuttleButtMessage { let delta = Delta::deserialize(buf)?; Ok(Self::Ack { delta }) } + MessageType::BadCluster => Ok(Self::BadCluster), } } @@ -129,6 +138,30 @@ impl Serializable for ScuttleButtMessage { 1 + digest.serialized_len() + delta.serialized_len() } ScuttleButtMessage::Ack { delta } => 1 + delta.serialized_len(), + ScuttleButtMessage::BadCluster => 1, } } } + +#[cfg(test)] +mod tests { + use crate::{Digest, ScuttleButtMessage}; + use crate::serialize::test_serdeser_aux; + + #[test] + fn test_syn() { + let mut digest = Digest::default(); + digest.add_node("node1".into(), 1); + digest.add_node("node2".into(), 2); + let syn = ScuttleButtMessage::Syn { + cluster_name: "cluster-a".to_string(), + digest + }; + test_serdeser_aux(&syn, 58); + } + + #[test] + fn test_bad_cluster() { + test_serdeser_aux(&ScuttleButtMessage::BadCluster, 1); + } +} diff --git a/scuttlebutt/src/server.rs b/scuttlebutt/src/server.rs index a29c188..94903d8 100644 --- a/scuttlebutt/src/server.rs +++ b/scuttlebutt/src/server.rs @@ -463,18 +463,19 @@ mod tests { } #[tokio::test] - async fn ignore_mismatched_cluster_name() { - let server_addr = "0.0.0.0:2223"; - let socket = UdpSocket::bind("0.0.0.0:2224").await.unwrap(); + async fn syn_bad_cluster() { + let outsider_addr = "0.0.0.0:2224"; + let socket = UdpSocket::bind(outsider_addr).await.unwrap(); let mut outsider = ScuttleButt::with_node_id_and_seeds( - "offline".into(), + outsider_addr.into(), HashSet::new(), - "offline".to_string(), + outsider_addr.into(), "another-cluster".to_string(), Vec::<(&str, &str)>::new(), FailureDetectorConfig::default(), ); + let server_addr = "0.0.0.0:2223"; let server = ScuttleServer::spawn( server_addr.into(), &[], @@ -489,11 +490,14 @@ mod tests { syn.serialize(&mut buf); socket.send_to(&buf[..], server_addr).await.unwrap(); - // server will drop the message, we expect the recv to timeout let mut buf = [0; super::UDP_MTU]; - let resp = - tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf)).await; - assert!(resp.is_err(), "unexpected response from peer"); + let (len, _addr) = timeout(socket.recv_from(&mut buf)).await.unwrap(); + + let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap(); + match msg { + ScuttleButtMessage::BadCluster => (), + message => panic!("unexpected message: {:?}", message), + } server.shutdown().await.unwrap(); }