Skip to content

Commit

Permalink
test(gossipsub): make CI test to be more strict 0-tolerance for misse…
Browse files Browse the repository at this point in the history
…d published messages
  • Loading branch information
bochaco committed Sep 25, 2023
1 parent e82fba2 commit c03a170
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
24 changes: 22 additions & 2 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,30 @@ impl SwarmDriver {
}
SwarmCmd::GossipsubPublish { topic_id, msg } => {
let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id);
self.swarm
let r = self
.swarm
.behaviour_mut()
.gossipsub
.publish(topic_id, msg)?;
.publish(topic_id.clone(), msg);

let connected_peers = self.swarm.connected_peers().count();
let p: Vec<_> = self.swarm.behaviour().gossipsub.all_peers().collect();

if let Err(err) = &r {
info!(
">>>>>SEND ERR {topic_id}: {err:?} == {connected_peers} :{}: {:?}",
p.len(),
p.iter().map(|(_, t)| t).collect::<Vec<_>>()
);
} else {
info!(
">>>>>SEND OK {topic_id}: sent with {connected_peers} peers :{}: {:?}",
p.len(),
p.iter().map(|(_, t)| t).collect::<Vec<_>>()
);
}

r?;
}
}

Expand Down
4 changes: 4 additions & 0 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ impl SwarmDriver {
libp2p::gossipsub::Event::Message { message, .. } => {
let topic = message.topic.into_string();
let msg = message.data;
info!(
">>>>> MSG on {topic}: {}",
String::from_utf8(msg.clone()).unwrap()
);
self.send_event(NetworkEvent::GossipsubMsg { topic, msg });
}
other => trace!("Gossipsub Event has been ignored: {other:?}"),
Expand Down
16 changes: 10 additions & 6 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tonic::Request;

const NODE_COUNT: u8 = 25;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

Expand All @@ -47,7 +47,7 @@ async fn msgs_over_gossipsub() -> Result<()> {
println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(10000), async {
let _ = timeout(Duration::from_millis(20000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Expand All @@ -57,6 +57,9 @@ async fn msgs_over_gossipsub() -> Result<()> {
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - 1 {
break;
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
Expand All @@ -70,16 +73,17 @@ async fn msgs_over_gossipsub() -> Result<()> {
Ok::<u8, eyre::Error>(count)
});

tokio::time::sleep(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_millis(10000)).await;

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(addr, topic).await?;

let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
assert_eq!(
count,
NODE_COUNT - 1,
"Not all messages were received by node {}",
node_index
);
}
Expand Down

0 comments on commit c03a170

Please sign in to comment.