Skip to content

Commit

Permalink
test(gossipsub): make CI test to be more strict 0-tolerance for misse…
Browse files Browse the repository at this point in the history
…d published messages
  • Loading branch information
bochaco committed Sep 25, 2023
1 parent d730d29 commit e88074d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
24 changes: 22 additions & 2 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,30 @@ impl SwarmDriver {
}
SwarmCmd::GossipsubPublish { topic_id, msg } => {
let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id);
self.swarm
let r = self
.swarm
.behaviour_mut()
.gossipsub
.publish(topic_id, msg)?;
.publish(topic_id.clone(), msg);

let connected_peers = self.swarm.connected_peers().count();
let p: Vec<_> = self.swarm.behaviour().gossipsub.all_peers().collect();

if let Err(err) = &r {
info!(
">>>>>SEND ERR {topic_id}: {err:?} == {connected_peers} :{}: {:?}",
p.len(),
p.iter().map(|(_, t)| t).collect::<Vec<_>>()
);
} else {
info!(
">>>>>SEND OK {topic_id}: sent with {connected_peers} peers :{}: {:?}",
p.len(),
p.iter().map(|(_, t)| t).collect::<Vec<_>>()
);
}

r?;
}
}

Expand Down
4 changes: 4 additions & 0 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ impl SwarmDriver {
libp2p::gossipsub::Event::Message { message, .. } => {
let topic = message.topic.into_string();
let msg = message.data;
info!(
">>>>> MSG on {topic}: {}",
String::from_utf8(msg.clone()).unwrap()
);
self.send_event(NetworkEvent::GossipsubMsg { topic, msg });
}
other => trace!("Gossipsub Event has been ignored: {other:?}"),
Expand Down
12 changes: 8 additions & 4 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use tonic::Request;

const NODE_COUNT: u8 = 25;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

for node_index in 1..NODE_COUNT + 1 {
// request current node to subscribe to a fresh new topic
addr.set_port(12000 + node_index as u16);
let topic = format!("TestTopic-{node_index}");
let random_num = rand::random::<u64>();
let topic = format!("TestTopic-{node_index}-{random_num}");
node_subscribe_to_topic(addr, topic.clone()).await?;

println!("Node {node_index} subscribed to {topic}");
Expand All @@ -57,6 +58,9 @@ async fn msgs_over_gossipsub() -> Result<()> {
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - 1 {
break;
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
Expand All @@ -78,8 +82,8 @@ async fn msgs_over_gossipsub() -> Result<()> {
let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
count >= NODE_COUNT - 2,
"Not all messages were received by node {}",
node_index
);
}
Expand Down

0 comments on commit e88074d

Please sign in to comment.