diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index b2aaf10011..efeb1361a3 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -367,10 +367,30 @@ impl SwarmDriver { } SwarmCmd::GossipsubPublish { topic_id, msg } => { let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); - self.swarm + let r = self + .swarm .behaviour_mut() .gossipsub - .publish(topic_id, msg)?; + .publish(topic_id.clone(), msg); + + let connected_peers = self.swarm.connected_peers().count(); + let p: Vec<_> = self.swarm.behaviour().gossipsub.all_peers().collect(); + + if let Err(err) = &r { + info!( + ">>>>>SEND ERR {topic_id}: {err:?} == {connected_peers} :{}: {:?}", + p.len(), + p.iter().map(|(_, t)| t).collect::>() + ); + } else { + info!( + ">>>>>SEND OK {topic_id}: sent with {connected_peers} peers :{}: {:?}", + p.len(), + p.iter().map(|(_, t)| t).collect::>() + ); + } + + r?; } } diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 3364a9b46e..efa575e08b 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -335,6 +335,10 @@ impl SwarmDriver { libp2p::gossipsub::Event::Message { message, .. } => { let topic = message.topic.into_string(); let msg = message.data; + info!( + ">>>>> MSG on {topic}: {}", + String::from_utf8(msg.clone()).unwrap() + ); self.send_event(NetworkEvent::GossipsubMsg { topic, msg }); } other => trace!("Gossipsub Event has been ignored: {other:?}"), diff --git a/sn_node/tests/msgs_over_gossipsub.rs b/sn_node/tests/msgs_over_gossipsub.rs index f13c60ab16..fdef0bf235 100644 --- a/sn_node/tests/msgs_over_gossipsub.rs +++ b/sn_node/tests/msgs_over_gossipsub.rs @@ -25,14 +25,15 @@ use tonic::Request; const NODE_COUNT: u8 = 25; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn msgs_over_gossipsub() -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000); for node_index in 1..NODE_COUNT + 1 { // request current node to subscribe to a fresh new topic addr.set_port(12000 + node_index as u16); - let topic = format!("TestTopic-{node_index}"); + let random_num = rand::random::(); + let topic = format!("TestTopic-{node_index}-{random_num}"); node_subscribe_to_topic(addr, topic.clone()).await?; println!("Node {node_index} subscribed to {topic}"); @@ -57,6 +58,9 @@ async fn msgs_over_gossipsub() -> Result<()> { String::from_utf8(msg).unwrap() ); count += 1; + if count == NODE_COUNT - 1 { + break; + } } Ok(_) => { /* ignored */ } Err(_) => { @@ -78,8 +82,8 @@ async fn msgs_over_gossipsub() -> Result<()> { let count = handle.await??; println!("Messages received by node {node_index}: {count}"); assert!( - count > 0, - "No message received by node at index {}", + count >= NODE_COUNT - 2, + "Not all messages were received by node {}", node_index ); }