From c03a170f1631b6d0a54c03bcf9b065e8e4e5bec8 Mon Sep 17 00:00:00 2001 From: bochaco Date: Thu, 21 Sep 2023 18:38:28 -0300 Subject: [PATCH] test(gossipsub): make CI test to be more strict 0-tolerance for missed published messages --- sn_networking/src/cmd.rs | 24 ++++++++++++++++++++++-- sn_networking/src/event.rs | 4 ++++ sn_node/tests/msgs_over_gossipsub.rs | 16 ++++++++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index b2aaf10011..efeb1361a3 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -367,10 +367,30 @@ impl SwarmDriver { } SwarmCmd::GossipsubPublish { topic_id, msg } => { let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); - self.swarm + let r = self + .swarm .behaviour_mut() .gossipsub - .publish(topic_id, msg)?; + .publish(topic_id.clone(), msg); + + let connected_peers = self.swarm.connected_peers().count(); + let p: Vec<_> = self.swarm.behaviour().gossipsub.all_peers().collect(); + + if let Err(err) = &r { + info!( + ">>>>>SEND ERR {topic_id}: {err:?} == {connected_peers} :{}: {:?}", + p.len(), + p.iter().map(|(_, t)| t).collect::>() + ); + } else { + info!( + ">>>>>SEND OK {topic_id}: sent with {connected_peers} peers :{}: {:?}", + p.len(), + p.iter().map(|(_, t)| t).collect::>() + ); + } + + r?; } } diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 3364a9b46e..efa575e08b 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -335,6 +335,10 @@ impl SwarmDriver { libp2p::gossipsub::Event::Message { message, .. } => { let topic = message.topic.into_string(); let msg = message.data; + info!( + ">>>>> MSG on {topic}: {}", + String::from_utf8(msg.clone()).unwrap() + ); self.send_event(NetworkEvent::GossipsubMsg { topic, msg }); } other => trace!("Gossipsub Event has been ignored: {other:?}"), diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index f13c60ab16..c233cee207 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -25,7 +25,7 @@ use tonic::Request; const NODE_COUNT: u8 = 25; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); @@ -47,7 +47,7 @@ async fn msgs_over_gossipsub() -> Result<()> { println!("Listening to node events..."); let mut count = 0; - let _ = timeout(Duration::from_millis(10000), async { + let _ = timeout(Duration::from_millis(20000), async { let mut stream = response.into_inner(); while let Some(Ok(e)) = stream.next().await { match NodeEvent::from_bytes(&e.event) { @@ -57,6 +57,9 @@ async fn msgs_over_gossipsub() -> Result<()> { String::from_utf8(msg).unwrap() ); count += 1; + if count == NODE_COUNT - 1 { + break; + } } Ok(_) => { /* ignored */ } Err(_) => { @@ -70,16 +73,17 @@ async fn msgs_over_gossipsub() -> Result<()> { Ok::(count) }); - tokio::time::sleep(Duration::from_millis(1000)).await; + tokio::time::sleep(Duration::from_millis(10000)).await; // have all other nodes to publish each a different msg to that same topic other_nodes_to_publish_on_topic(addr, topic).await?; let count = handle.await??; println!("Messages received by node {node_index}: {count}"); - assert!( - count > 0, - "No message received by node at index {}", + assert_eq!( + count, + NODE_COUNT - 1, + "Not all messages were received by node {}", node_index ); }