diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 2d2af31c01..8329fc3317 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -29,28 +29,33 @@ const TEST_CYCLES: u8 = 20; #[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + let all_nodes_addrs: Vec<_> = (0..NODE_COUNT) + .map(|i| { + ( + i, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12001 + i as u16), + ) + }) + .collect(); + for c in 0..TEST_CYCLES { let topic = format!("TestTopic-{}", rand::random::()); println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); println!("============================================================"); - let mut subs_addrs = vec![]; - let mut subs_handles = vec![]; - // get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic let mut rng = rand::thread_rng(); - let random_indexes = + let random_subs_nodes: Vec<_> = rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into()) - .into_vec(); + .iter() + .map(|i| all_nodes_addrs[i]) + .collect(); - for node_index in random_indexes { + let mut subs_handles = vec![]; + for (node_index, addr) in random_subs_nodes.clone() { // request current node to subscribe to the topic - addr.set_port(12000 + node_index as u16); + println!("Node #{node_index} ({addr}) subscribing to {topic} ..."); node_subscribe_to_topic(addr, topic.clone()).await?; - subs_addrs.push(addr); - - println!("Node {node_index} subscribed to {topic}"); let handle = tokio::spawn(async move { let endpoint = format!("https://{addr}"); @@ -59,16 +64,15 @@ async fn msgs_over_gossipsub() -> Result<()> { .node_events(Request::new(NodeEventsRequest {})) .await?; - println!("Listening to node events..."); let mut count = 0; - let _ = timeout(Duration::from_millis(6000), async { + let _ = timeout(Duration::from_millis(30000), async { let mut stream = response.into_inner(); while let Some(Ok(e)) = stream.next().await { match NodeEvent::from_bytes(&e.event) { Ok(NodeEvent::GossipsubMsg { topic, msg }) => { println!( - "New gossipsub msg received on '{topic}': {}", + "Msg received on node #{node_index} '{topic}': {}", String::from_utf8(msg).unwrap() ); count += 1; @@ -94,7 +98,9 @@ async fn msgs_over_gossipsub() -> Result<()> { tokio::time::sleep(Duration::from_millis(3000)).await; // have all other nodes to publish each a different msg to that same topic - other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?; + let mut other_nodes = all_nodes_addrs.clone(); + other_nodes.retain(|node| random_subs_nodes.iter().all(|n| n != node)); + other_nodes_to_publish_on_topic(other_nodes, topic.clone()).await?; for (node_index, addr, handle) in subs_handles.into_iter() { let count = handle.await??; @@ -137,26 +143,22 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< } async fn other_nodes_to_publish_on_topic( - filter_addrs: Vec, + nodes: Vec<(u8, SocketAddr)>, topic: String, ) -> Result<()> { - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - for node_index in 1..NODE_COUNT + 1 { - addr.set_port(12000 + node_index as u16); - if filter_addrs.iter().all(|a| a != &addr) { - let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); - - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - println!("Node {node_index} to publish on {topic} message: {msg}"); - - let _response = rpc_client - .publish_on_topic(Request::new(GossipsubPublishRequest { - topic: topic.clone(), - msg: msg.into(), - })) - .await?; - } + for (node_index, addr) in nodes { + let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); + + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + println!("Node {node_index} to publish on {topic} message: {msg}"); + + let _response = rpc_client + .publish_on_topic(Request::new(GossipsubPublishRequest { + topic: topic.clone(), + msg: msg.into(), + })) + .await?; } Ok(())