Skip to content

Commit

Permalink
ensure cluster name when accepting syn messages
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Apr 7, 2022
1 parent 902d730 commit 35ee08a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 11 deletions.
20 changes: 17 additions & 3 deletions scuttlebutt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use failure_detector::FailureDetectorConfig;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::digest::Digest;
use crate::message::ScuttleButtMessage;
Expand Down Expand Up @@ -172,12 +172,26 @@ impl ScuttleButt {

pub fn create_syn_message(&mut self) -> ScuttleButtMessage {
let digest = self.compute_digest();
ScuttleButtMessage::Syn { digest }
ScuttleButtMessage::Syn {
cluster_name: self.cluster_name.clone(),
digest,
}
}

pub fn process_message(&mut self, msg: ScuttleButtMessage) -> Option<ScuttleButtMessage> {
match msg {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
if cluster_name != self.cluster_name {
warn!(
cluster_name = ?cluster_name,
"ignoring syn message with mismatching cluster name"
);
return None;
}

let self_digest = self.compute_digest();
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
Expand Down
22 changes: 18 additions & 4 deletions scuttlebutt/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use crate::serialize::Serializable;
#[derive(Debug)]
pub enum ScuttleButtMessage {
/// Node A initiates handshakes.
Syn { digest: Digest },
Syn {
cluster_name: String,
digest: Digest,
},
/// Node B returns a partial update as described
/// in the scuttlebutt reconcialiation algorithm,
/// and returns its own checksum.
Expand Down Expand Up @@ -68,9 +71,13 @@ impl MessageType {
impl Serializable for ScuttleButtMessage {
fn serialize(&self, buf: &mut Vec<u8>) {
match self {
ScuttleButtMessage::Syn { digest } => {
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
buf.push(MessageType::Syn.to_code());
digest.serialize(buf);
cluster_name.serialize(buf);
}
ScuttleButtMessage::SynAck { digest, delta } => {
buf.push(MessageType::SynAck.to_code());
Expand All @@ -94,7 +101,11 @@ impl Serializable for ScuttleButtMessage {
match code {
MessageType::Syn => {
let digest = Digest::deserialize(buf)?;
Ok(Self::Syn { digest })
let cluster_name = String::deserialize(buf)?;
Ok(Self::Syn {
cluster_name,
digest,
})
}
MessageType::SynAck => {
let digest = Digest::deserialize(buf)?;
Expand All @@ -110,7 +121,10 @@ impl Serializable for ScuttleButtMessage {

fn serialized_len(&self) -> usize {
match self {
ScuttleButtMessage::Syn { digest } => 1 + digest.serialized_len(),
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => 1 + cluster_name.serialized_len() + digest.serialized_len(),
ScuttleButtMessage::SynAck { digest, delta } => {
1 + digest.serialized_len() + delta.serialized_len()
}
Expand Down
48 changes: 44 additions & 4 deletions scuttlebutt/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl ScuttleServer {

/// Call a function with mutable access to the [`ScuttleButt`].
pub async fn with_scuttlebutt<F, T>(&self, mut fun: F) -> T
where
F: FnMut(&mut ScuttleButt) -> T,
{
where F: FnMut(&mut ScuttleButt) -> T {
let mut scuttlebutt = self.scuttlebutt.lock().await;
fun(&mut scuttlebutt)
}
Expand Down Expand Up @@ -412,7 +410,13 @@ mod tests {

let msg = ScuttleButtMessage::deserialize(&mut &buf[..len]).unwrap();
match msg {
ScuttleButtMessage::Syn { .. } => (),
ScuttleButtMessage::Syn {
cluster_name,
digest,
} => {
assert_eq!(cluster_name, "test-cluster");
assert_eq!(digest.node_max_version.len(), 1);
}
message => panic!("unexpected message: {:?}", message),
}

Expand Down Expand Up @@ -458,6 +462,42 @@ mod tests {
server.shutdown().await.unwrap();
}

#[tokio::test]
async fn ignore_mismatched_cluster_name() {
let server_addr = "0.0.0.0:2221";
let socket = UdpSocket::bind("0.0.0.0:2222").await.unwrap();
let mut outsider = ScuttleButt::with_node_id_and_seeds(
"offline".into(),
HashSet::new(),
"offline".to_string(),
"another-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);

let server = ScuttleServer::spawn(
server_addr.into(),
&[],
server_addr,
"test-cluster".to_string(),
Vec::<(&str, &str)>::new(),
FailureDetectorConfig::default(),
);

let mut buf = Vec::new();
let syn = outsider.create_syn_message();
syn.serialize(&mut buf);
socket.send_to(&buf[..], server_addr).await.unwrap();

// server will drop the message, we expect the recv to timeout
let mut buf = [0; super::UDP_MTU];
let resp =
tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf)).await;
assert!(resp.is_err(), "unexpected response from peer");

server.shutdown().await.unwrap();
}

#[tokio::test]
async fn ignore_broken_payload() {
let server_addr = "0.0.0.0:3331";
Expand Down

0 comments on commit 35ee08a

Please sign in to comment.