diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index 2d2af31c01..b02c363c66 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -29,28 +29,28 @@ const TEST_CYCLES: u8 = 20; #[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); + let all_nodes_addrs: Vec = (0..NODE_COUNT as u16) + .map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12001 + i)) + .collect(); + for c in 0..TEST_CYCLES { let topic = format!("TestTopic-{}", rand::random::()); println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1); println!("============================================================"); - let mut subs_addrs = vec![]; - let mut subs_handles = vec![]; - // get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic let mut rng = rand::thread_rng(); - let random_indexes = + let random_subs_nodes: Vec<_> = rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into()) - .into_vec(); + .iter() + .map(|i| (i, all_nodes_addrs[i])) + .collect(); - for node_index in random_indexes { + let mut subs_handles = vec![]; + for (node_index, addr) in random_subs_nodes.clone() { // request current node to subscribe to the topic - addr.set_port(12000 + node_index as u16); + println!("Node #{node_index} ({addr}) subscribing to {topic} ..."); node_subscribe_to_topic(addr, topic.clone()).await?; - subs_addrs.push(addr); - - println!("Node {node_index} subscribed to {topic}"); let handle = tokio::spawn(async move { let endpoint = format!("https://{addr}"); @@ -62,7 +62,7 @@ async fn msgs_over_gossipsub() -> Result<()> { println!("Listening to node events..."); let mut count = 0; - let _ = timeout(Duration::from_millis(6000), async { + let _ = timeout(Duration::from_millis(10000), async { let mut stream = response.into_inner(); while let Some(Ok(e)) = stream.next().await { match NodeEvent::from_bytes(&e.event) { @@ -94,7 +94,7 @@ async fn msgs_over_gossipsub() -> Result<()> { tokio::time::sleep(Duration::from_millis(3000)).await; // have all other nodes to publish each a different msg to that same topic - other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?; + other_nodes_to_publish_on_topic(random_subs_nodes, topic.clone()).await?; for (node_index, addr, handle) in subs_handles.into_iter() { let count = handle.await??; @@ -137,26 +137,22 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result< } async fn other_nodes_to_publish_on_topic( - filter_addrs: Vec, + subs_nodes: Vec<(usize, SocketAddr)>, topic: String, ) -> Result<()> { - let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); - for node_index in 1..NODE_COUNT + 1 { - addr.set_port(12000 + node_index as u16); - if filter_addrs.iter().all(|a| a != &addr) { - let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); - - let endpoint = format!("https://{addr}"); - let mut rpc_client = SafeNodeClient::connect(endpoint).await?; - println!("Node {node_index} to publish on {topic} message: {msg}"); - - let _response = rpc_client - .publish_on_topic(Request::new(GossipsubPublishRequest { - topic: topic.clone(), - msg: msg.into(), - })) - .await?; - } + for (node_index, addr) in subs_nodes { + let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}"); + + let endpoint = format!("https://{addr}"); + let mut rpc_client = SafeNodeClient::connect(endpoint).await?; + println!("Node {node_index} to publish on {topic} message: {msg}"); + + let _response = rpc_client + .publish_on_topic(Request::new(GossipsubPublishRequest { + topic: topic.clone(), + msg: msg.into(), + })) + .await?; } Ok(())