Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(gossipsub): make CI test to be more strict 0-tolerance for missed published messages #763

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ fn encrypt_large(
fn package_small(file: SmallFile) -> Result<Chunk> {
let chunk = to_chunk(file.bytes());
if chunk.value().len() >= self_encryption::MIN_ENCRYPTABLE_BYTES {
return Err(Error::SmallFilePaddingNeeded(chunk.value().len()))?;
return Err(Error::SmallFilePaddingNeeded(chunk.value().len()).into());
}
Ok(chunk)
}
4 changes: 1 addition & 3 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ impl Node {
}
};

let keys_to_replicate = replicate_to
.entry(target_peer)
.or_insert(Default::default());
let keys_to_replicate = replicate_to.entry(target_peer).or_default();
keys_to_replicate.push(key.clone());
}
}
Expand Down
1 change: 0 additions & 1 deletion sn_node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ async fn data_availability_during_churn() -> Result<()> {
let client = client.clone();
let net_addr = net_addr.clone();
let cash_notes = cash_notes.clone();
let churn_period = churn_period;

let failures = failures.clone();
let wallet_dir = paying_wallet_dir.to_path_buf().clone();
Expand Down
170 changes: 97 additions & 73 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,95 @@ use tokio_stream::StreamExt;
use tonic::Request;

const NODE_COUNT: u8 = 25;
const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers
const TEST_CYCLES: u8 = 20;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

for node_index in 1..NODE_COUNT + 1 {
// request current node to subscribe to a fresh new topic
addr.set_port(12000 + node_index as u16);
let random_num = rand::random::<u64>();
let topic = format!("TestTopic-{node_index}-{random_num}");
node_subscribe_to_topic(addr, topic.clone()).await?;

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(4000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - 1 {
break;
let all_nodes_addrs: Vec<_> = (0..NODE_COUNT)
.map(|i| {
(
i,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12001 + i as u16),
)
})
.collect();

for c in 0..TEST_CYCLES {
let topic = format!("TestTopic-{}", rand::random::<u64>());
println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1);
println!("============================================================");

// get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic
let mut rng = rand::thread_rng();
let random_subs_nodes: Vec<_> =
rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into())
.iter()
.map(|i| all_nodes_addrs[i])
.collect();

let mut subs_handles = vec![];
for (node_index, addr) in random_subs_nodes.clone() {
// request current node to subscribe to the topic
println!("Node #{node_index} ({addr}) subscribing to {topic} ...");
node_subscribe_to_topic(addr, topic.clone()).await?;

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

let mut count = 0;

let _ = timeout(Duration::from_millis(30000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"Msg received on node #{node_index} '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - NODES_SUBSCRIBED {
break;
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
}
})
.await;
})
.await;

Ok::<u8, eyre::Error>(count)
});
Ok::<u8, eyre::Error>(count)
});

tokio::time::sleep(Duration::from_millis(1000)).await;

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(addr, topic.clone()).await?;
subs_handles.push((node_index, addr, handle));
}

let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
node_index
);
tokio::time::sleep(Duration::from_millis(3000)).await;

node_unsubscribe_from_topic(addr, topic).await?;
// have all other nodes to publish each a different msg to that same topic
let mut other_nodes = all_nodes_addrs.clone();
other_nodes.retain(|node| random_subs_nodes.iter().all(|n| n != node));
other_nodes_to_publish_on_topic(other_nodes, topic.clone()).await?;

for (node_index, addr, handle) in subs_handles.into_iter() {
let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert_eq!(
count,
NODE_COUNT - NODES_SUBSCRIBED,
"Not enough messages received by node at index {}",
node_index
);
node_unsubscribe_from_topic(addr, topic.clone()).await?;
}
}

Ok(())
Expand Down Expand Up @@ -117,24 +142,23 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result<
Ok(())
}

async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
for node_index in 1..NODE_COUNT + 1 {
addr.set_port(12000 + node_index as u16);
if addr != filter_addr {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
println!("Node {node_index} to publish on {topic} message: {msg}");

let _response = rpc_client
.publish_on_topic(Request::new(GossipsubPublishRequest {
topic: topic.clone(),
msg: msg.into(),
}))
.await?;
}
async fn other_nodes_to_publish_on_topic(
nodes: Vec<(u8, SocketAddr)>,
topic: String,
) -> Result<()> {
for (node_index, addr) in nodes {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
println!("Node {node_index} to publish on {topic} message: {msg}");

let _response = rpc_client
.publish_on_topic(Request::new(GossipsubPublishRequest {
topic: topic.clone(),
msg: msg.into(),
}))
.await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sn_transfers/src/transfers/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Transfer {
let u = CashNoteRedemption::new(derivation_index, parent_spend_addr);
cashnote_redemptions_map
.entry(recipient)
.or_insert_with(Vec::new)
.or_default()
.push(u);
}

Expand Down
Loading