Skip to content

Commit

Permalink
refactor(test): minor refactoring to gossipsub test
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Oct 4, 2023
1 parent fba46f0 commit 0ce621a
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@ const TEST_CYCLES: u8 = 20;

#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
let all_nodes_addrs: Vec<_> = (0..NODE_COUNT)
.map(|i| {
(
i,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12001 + i as u16),
)
})
.collect();

for c in 0..TEST_CYCLES {
let topic = format!("TestTopic-{}", rand::random::<u64>());
println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1);
println!("============================================================");

let mut subs_addrs = vec![];
let mut subs_handles = vec![];

// get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic
let mut rng = rand::thread_rng();
let random_indexes =
let random_subs_nodes: Vec<_> =
rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into())
.into_vec();
.iter()
.map(|i| all_nodes_addrs[i])
.collect();

for node_index in random_indexes {
let mut subs_handles = vec![];
for (node_index, addr) in random_subs_nodes.clone() {
// request current node to subscribe to the topic
addr.set_port(12000 + node_index as u16);
println!("Node #{node_index} ({addr}) subscribing to {topic} ...");
node_subscribe_to_topic(addr, topic.clone()).await?;
subs_addrs.push(addr);

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
Expand All @@ -59,16 +64,15 @@ async fn msgs_over_gossipsub() -> Result<()> {
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(6000), async {
let _ = timeout(Duration::from_millis(30000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
"Msg received on node #{node_index} '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
Expand All @@ -94,7 +98,9 @@ async fn msgs_over_gossipsub() -> Result<()> {
tokio::time::sleep(Duration::from_millis(3000)).await;

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?;
let mut other_nodes = all_nodes_addrs.clone();
other_nodes.retain(|node| random_subs_nodes.iter().all(|n| n != node));
other_nodes_to_publish_on_topic(other_nodes, topic.clone()).await?;

for (node_index, addr, handle) in subs_handles.into_iter() {
let count = handle.await??;
Expand Down Expand Up @@ -137,26 +143,22 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result<
}

async fn other_nodes_to_publish_on_topic(
filter_addrs: Vec<SocketAddr>,
nodes: Vec<(u8, SocketAddr)>,
topic: String,
) -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
for node_index in 1..NODE_COUNT + 1 {
addr.set_port(12000 + node_index as u16);
if filter_addrs.iter().all(|a| a != &addr) {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
println!("Node {node_index} to publish on {topic} message: {msg}");

let _response = rpc_client
.publish_on_topic(Request::new(GossipsubPublishRequest {
topic: topic.clone(),
msg: msg.into(),
}))
.await?;
}
for (node_index, addr) in nodes {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
println!("Node {node_index} to publish on {topic} message: {msg}");

let _response = rpc_client
.publish_on_topic(Request::new(GossipsubPublishRequest {
topic: topic.clone(),
msg: msg.into(),
}))
.await?;
}

Ok(())
Expand Down

0 comments on commit 0ce621a

Please sign in to comment.