Skip to content

Commit

Permalink
feat: validate gossipsub messages
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Oct 31, 2024
1 parent 1414f8e commit b5c0a1e
Showing 1 changed file with 40 additions and 14 deletions.
54 changes: 40 additions & 14 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p::{
autonat::{self, NatStatus},
dcutr,
futures::StreamExt,
gossipsub::{self, IdentTopic, Message, PublishError},
gossipsub::{self, IdentTopic, Message, MessageAcceptance, MessageId, PublishError},
identify::{self, Info},
identity::Keypair,
kad::{self, store::MemoryStore, Event},
Expand Down Expand Up @@ -372,6 +372,7 @@ where S: ShareChain
// .max_messages_per_rpc(Some(1000))
// We get a lot of messages, so
//.duplicate_cache_time(Duration::from_secs(1))
.validate_messages()
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?;
let gossipsub = gossipsub::Behaviour::new(
Expand Down Expand Up @@ -579,33 +580,34 @@ where S: ShareChain

/// Main method to handle any message comes from gossipsub.
#[allow(clippy::too_many_lines)]
async fn handle_new_gossipsub_message(&mut self, message: Message) {
async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result<MessageAcceptance, Error> {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}");
let peer = message.source;
if let Some(peer) = peer {
let topic = message.topic.to_string();

match topic {
topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) {
Ok(payload) => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}");
debug!(target: LOG_TARGET, squad = &self.config.squad; "[NETWORK] New peer info: {peer:?} -> {payload:?}");
if payload.version != PROTOCOL_VERSION {
trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return;
return Ok(MessageAcceptance::Reject);
}
if payload.squad != self.config.squad.as_string() {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad, skipping. Our squad: {}, their squad:{}", peer, self.config.squad, payload.squad);
return;
return Ok(MessageAcceptance::Ignore);
}
if !self.config.is_seed_peer {
if self.add_peer(payload, peer.clone()).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
}
}
return Ok(MessageAcceptance::Accept);
},
Err(error) => {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: {:?}", error);
return Ok(MessageAcceptance::Reject);
},
},
topic if topic == Self::squad_topic(&self.config.squad, PEER_INFO_TOPIC) => {
Expand All @@ -616,16 +618,18 @@ where S: ShareChain
debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {peer:?} -> {payload:?}");
if payload.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return;
return Ok(MessageAcceptance::Reject);
}
if !self.config.is_seed_peer {
if self.add_peer(payload, peer.clone()).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
}
}
return Ok(MessageAcceptance::Accept);
},
Err(error) => {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: {:?}", error);
return Ok(MessageAcceptance::Reject);
},
}
},
Expand All @@ -642,7 +646,7 @@ where S: ShareChain
Ok(payload) => {
if payload.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return;
return Ok(MessageAcceptance::Reject);
}
let payload = Arc::new(payload);
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}");
Expand All @@ -655,7 +659,7 @@ where S: ShareChain
// verify payload
if payload.new_blocks.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", peer);
return;
return Ok(MessageAcceptance::Reject);
}

debug!(target: LOG_TARGET, squad = &self.config.squad; "🆕 New block from broadcast: {:?}", &payload.new_blocks.iter().map(|b| b.0.to_string()).join(","));
Expand All @@ -672,7 +676,7 @@ where S: ShareChain
our_tip.saturating_sub(4)
{
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", peer);
return;
return Ok(MessageAcceptance::Ignore);
}

let mut missing_blocks = vec![];
Expand All @@ -685,10 +689,11 @@ where S: ShareChain
if !missing_blocks.is_empty() {
if self.start_time.elapsed() < STARTUP_CATCH_UP_TIME {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Still in startup catch up time, skipping block until we have synced");
return;
return Ok(MessageAcceptance::Accept);
}
self.sync_share_chain(algo, peer, missing_blocks, true).await;
}
return Ok(MessageAcceptance::Accept);
},
Err(error) => {
// TODO: elevate to error
Expand All @@ -699,18 +704,21 @@ where S: ShareChain
format!("Node sent a block that could not be deserialized: {:?}", error),
)
.await;
return Ok(MessageAcceptance::Reject);
},
}
},
_ => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Unknown topic {topic:?}!");

warn!(target: LOG_TARGET, squad = &self.config.squad; "Unknown topic {topic:?}!");
return Ok(MessageAcceptance::Reject);
},
}
} else {
warn!(target: LOG_TARGET, squad = &self.config.squad; "No peer found for message");
}
Ok(MessageAcceptance::Reject)
}

async fn add_peer(&mut self, payload: PeerInfo, peer: PeerId) -> bool {
Expand Down Expand Up @@ -1005,10 +1013,28 @@ where S: ShareChain
ServerNetworkBehaviourEvent::Gossipsub(event) => match event {
gossipsub::Event::Message {
message,
message_id: _message_id,
propagation_source: _propagation_source,
} => {
self.handle_new_gossipsub_message(message).await;
message_id,
propagation_source,
} => match self.handle_new_gossipsub_message(message).await {
Ok(res) => {
let _ = self.swarm.behaviour_mut().gossipsub.report_message_validation_result(
&message_id,
&propagation_source,
res,
).inspect_err(|e| {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to report message validation result: {e:?}");
});
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to handle gossipsub message: {error:?}");
let _ = self.swarm.behaviour_mut().gossipsub.report_message_validation_result(
&message_id,
&propagation_source,
MessageAcceptance::Reject,
).inspect_err(|e| {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to report message validation result: {e:?}");
});
},
},
gossipsub::Event::Subscribed { .. } => {},
gossipsub::Event::Unsubscribed { .. } => {},
Expand Down

0 comments on commit b5c0a1e

Please sign in to comment.