From 097f05dbade84f01a5f8a8630702c8377e4bcb51 Mon Sep 17 00:00:00 2001 From: bochaco Date: Wed, 4 Oct 2023 12:30:32 -0300 Subject: [PATCH] test(gossipsub): make CI test to be more strict, 0-tolerance for missed published messages --- sn_node/tests/msgs_over_gossipsub.rs | 129 +++++++++++++++------------ 1 file changed, 74 insertions(+), 55 deletions(-) diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 9875e9bc1e..60db47ad0d 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -15,6 +15,7 @@ use common::safenode_proto::{ use sn_node::NodeEvent; use eyre::Result; +use rand::Rng; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, @@ -24,70 +25,85 @@ use tokio_stream::StreamExt; use tonic::Request; const NODE_COUNT: u8 = 25; +const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers +const TEST_CYCLES: u8 = 100; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - - for node_index in 1..NODE_COUNT + 1 { - // request current node to subscribe to a fresh new topic - addr.set_port(12000 + node_index as u16); - let random_num = rand::random::(); - let topic = format!("TestTopic-{node_index}-{random_num}"); - node_subscribe_to_topic(addr, topic.clone()).await?; - - println!("Node {node_index} subscribed to {topic}"); - - let handle = tokio::spawn(async move { - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - let response = rpc_client - .node_events(Request::new(NodeEventsRequest {})) - .await?; - - println!("Listening to node events..."); - let mut count = 0; - - let _ = timeout(Duration::from_millis(4000), async { - let mut stream = response.into_inner(); - while let Some(Ok(e)) = stream.next().await { - match NodeEvent::from_bytes(&e.event) { - Ok(NodeEvent::GossipsubMsg { topic, msg }) => { - println!( - "New gossipsub msg received on '{topic}': {}", - String::from_utf8(msg).unwrap() - ); - count += 1; - if count == NODE_COUNT - 1 { - break; + for c in 0..TEST_CYCLES { + let topic = format!("TestTopic-{}", rand::random::()); + println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); + println!("============================================================"); + + let mut subs_addrs = vec![]; + let mut subs_handles = vec![]; + + // let's get a random subset of NODES_SUBSCRIBED number of nodes to subscribe to the topic + let random_index = rand::thread_rng().gen_range(1..NODE_COUNT - NODES_SUBSCRIBED); + for node_index in random_index..random_index + NODES_SUBSCRIBED { + // request current node to subscribe to the topic + addr.set_port(12000 + node_index as u16); + node_subscribe_to_topic(addr, topic.clone()).await?; + subs_addrs.push(addr); + + println!("Node {node_index} subscribed to {topic}"); + + let handle = tokio::spawn(async move { + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + let response = rpc_client + .node_events(Request::new(NodeEventsRequest {})) + .await?; + + println!("Listening to node events..."); + let mut count = 0; + + let _ = timeout(Duration::from_millis(3000), async { + let mut stream = response.into_inner(); + while let Some(Ok(e)) = stream.next().await { + match NodeEvent::from_bytes(&e.event) { + Ok(NodeEvent::GossipsubMsg { topic, msg }) => { + println!( + "New gossipsub msg received on '{topic}': {}", + String::from_utf8(msg).unwrap() + ); + count += 1; + if count == NODE_COUNT - NODES_SUBSCRIBED { + break; + } + } + Ok(_) => { /* ignored */ } + Err(_) => { + println!("Error while parsing received NodeEvent"); } - } - Ok(_) => { /* ignored */ } - Err(_) => { - println!("Error while parsing received NodeEvent"); } } - } - }) - .await; + }) + .await; + + Ok::(count) + }); - Ok::(count) - }); + subs_handles.push((node_index, addr, handle)); + } tokio::time::sleep(Duration::from_millis(1000)).await; // have all other nodes to publish each a different msg to that same topic - other_nodes_to_publish_on_topic(addr, topic.clone()).await?; - - let count = handle.await??; - println!("Messages received by node {node_index}: {count}"); - assert!( - count > 0, - "No message received by node at index {}", - node_index - ); - - node_unsubscribe_from_topic(addr, topic).await?; + other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?; + + for (node_index, addr, handle) in subs_handles.into_iter() { + let count = handle.await??; + println!("Messages received by node {node_index}: {count}"); + assert_eq!( + count, + NODE_COUNT - NODES_SUBSCRIBED, + "Not enough messages received by node at index {}", + node_index + ); + node_unsubscribe_from_topic(addr, topic.clone()).await?; + } } Ok(()) @@ -117,11 +133,14 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< Ok(()) } -async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> { +async fn other_nodes_to_publish_on_topic( + filter_addrs: Vec, + topic: String, +) -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); for node_index in 1..NODE_COUNT + 1 { addr.set_port(12000 + node_index as u16); - if addr != filter_addr { + if filter_addrs.iter().all(|a| a != &addr) { let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); let endpoint = format!("https://{addr}");