diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 9875e9bc1e..2d2af31c01 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -24,70 +24,89 @@ use tokio_stream::StreamExt; use tonic::Request; const NODE_COUNT: u8 = 25; +const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers +const TEST_CYCLES: u8 = 20; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - - for node_index in 1..NODE_COUNT + 1 { - // request current node to subscribe to a fresh new topic - addr.set_port(12000 + node_index as u16); - let random_num = rand::random::(); - let topic = format!("TestTopic-{node_index}-{random_num}"); - node_subscribe_to_topic(addr, topic.clone()).await?; - - println!("Node {node_index} subscribed to {topic}"); - - let handle = tokio::spawn(async move { - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - let response = rpc_client - .node_events(Request::new(NodeEventsRequest {})) - .await?; - - println!("Listening to node events..."); - let mut count = 0; - - let _ = timeout(Duration::from_millis(4000), async { - let mut stream = response.into_inner(); - while let Some(Ok(e)) = stream.next().await { - match NodeEvent::from_bytes(&e.event) { - Ok(NodeEvent::GossipsubMsg { topic, msg }) => { - println!( - "New gossipsub msg received on '{topic}': {}", - String::from_utf8(msg).unwrap() - ); - count += 1; - if count == NODE_COUNT - 1 { - break; + for c in 0..TEST_CYCLES { + let topic = format!("TestTopic-{}", rand::random::()); + println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); + println!("============================================================"); + + let mut subs_addrs = vec![]; + let mut subs_handles = vec![]; + + // get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic + let mut rng = rand::thread_rng(); + let random_indexes = + rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into()) + .into_vec(); + + for node_index in random_indexes { + // request current node to subscribe to the topic + addr.set_port(12000 + node_index as u16); + node_subscribe_to_topic(addr, topic.clone()).await?; + subs_addrs.push(addr); + + println!("Node {node_index} subscribed to {topic}"); + + let handle = tokio::spawn(async move { + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + let response = rpc_client + .node_events(Request::new(NodeEventsRequest {})) + .await?; + + println!("Listening to node events..."); + let mut count = 0; + + let _ = timeout(Duration::from_millis(6000), async { + let mut stream = response.into_inner(); + while let Some(Ok(e)) = stream.next().await { + match NodeEvent::from_bytes(&e.event) { + Ok(NodeEvent::GossipsubMsg { topic, msg }) => { + println!( + "New gossipsub msg received on '{topic}': {}", + String::from_utf8(msg).unwrap() + ); + count += 1; + if count == NODE_COUNT - NODES_SUBSCRIBED { + break; + } + } + Ok(_) => { /* ignored */ } + Err(_) => { + println!("Error while parsing received NodeEvent"); } - } - Ok(_) => { /* ignored */ } - Err(_) => { - println!("Error while parsing received NodeEvent"); } } - } - }) - .await; - - Ok::(count) - }); + }) + .await; - tokio::time::sleep(Duration::from_millis(1000)).await; + Ok::(count) + }); - // have all other nodes to publish each a different msg to that same topic - other_nodes_to_publish_on_topic(addr, topic.clone()).await?; + subs_handles.push((node_index, addr, handle)); + } - let count = handle.await??; - println!("Messages received by node {node_index}: {count}"); - assert!( - count > 0, - "No message received by node at index {}", - node_index - ); + tokio::time::sleep(Duration::from_millis(3000)).await; - node_unsubscribe_from_topic(addr, topic).await?; + // have all other nodes to publish each a different msg to that same topic + other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?; + + for (node_index, addr, handle) in subs_handles.into_iter() { + let count = handle.await??; + println!("Messages received by node {node_index}: {count}"); + assert_eq!( + count, + NODE_COUNT - NODES_SUBSCRIBED, + "Not enough messages received by node at index {}", + node_index + ); + node_unsubscribe_from_topic(addr, topic.clone()).await?; + } } Ok(()) @@ -117,11 +136,14 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< Ok(()) } -async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> { +async fn other_nodes_to_publish_on_topic( + filter_addrs: Vec, + topic: String, +) -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); for node_index in 1..NODE_COUNT + 1 { addr.set_port(12000 + node_index as u16); - if addr != filter_addr { + if filter_addrs.iter().all(|a| a != &addr) { let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); let endpoint = format!("https://{addr}");