diff --git a/crates/papyrus_network/src/bin/network_stress_test/converters.rs b/crates/papyrus_network/src/bin/network_stress_test/converters.rs index 18351e0abc..b0eca85211 100644 --- a/crates/papyrus_network/src/bin/network_stress_test/converters.rs +++ b/crates/papyrus_network/src/bin/network_stress_test/converters.rs @@ -1,31 +1,37 @@ use std::mem::size_of; +use std::str::FromStr; use std::time::{Duration, SystemTime}; -pub const METADATA_SIZE: usize = size_of::() + size_of::() + size_of::(); +use libp2p::PeerId; + +pub const METADATA_SIZE: usize = size_of::() + size_of::() + size_of::() + 38; #[derive(Debug, Clone)] pub struct StressTestMessage { pub id: u32, pub payload: Vec, pub time: SystemTime, + pub peer_id: String, } impl StressTestMessage { - pub fn new(id: u32, payload: Vec) -> Self { - StressTestMessage { id, payload, time: SystemTime::now() } + pub fn new(id: u32, payload: Vec, peer_id: String) -> Self { + StressTestMessage { id, payload, time: SystemTime::now(), peer_id } } } impl From for Vec { fn from(value: StressTestMessage) -> Self { - let StressTestMessage { id, mut payload, time } = value; + let StressTestMessage { id, mut payload, time, peer_id } = value; let id = id.to_be_bytes().to_vec(); let time = time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); let seconds = time.as_secs().to_be_bytes().to_vec(); let nanos = time.subsec_nanos().to_be_bytes().to_vec(); + let peer_id = PeerId::from_str(&peer_id).unwrap().to_bytes(); payload.extend(id); payload.extend(seconds); payload.extend(nanos); + payload.extend(peer_id); payload } } @@ -40,6 +46,7 @@ impl From> for StressTestMessage { let seconds = u64::from_be_bytes(id_and_time[4..12].try_into().unwrap()); let nanos = u32::from_be_bytes(id_and_time[12..16].try_into().unwrap()); let time = SystemTime::UNIX_EPOCH + Duration::new(seconds, nanos); - StressTestMessage { id, payload: value, time } + let peer_id = PeerId::from_bytes(&id_and_time[16..]).unwrap().to_string(); + StressTestMessage { id, payload: value, time, peer_id } } } diff --git a/crates/papyrus_network/src/bin/network_stress_test/main.rs b/crates/papyrus_network/src/bin/network_stress_test/main.rs index fdad868596..4e35f1117b 100644 --- a/crates/papyrus_network/src/bin/network_stress_test/main.rs +++ b/crates/papyrus_network/src/bin/network_stress_test/main.rs @@ -36,7 +36,7 @@ async fn main() { let mut i = 0; tokio::time::sleep(std::time::Duration::from_secs(5)).await; loop { - let message = StressTestMessage::new(i, vec![0; message_size - METADATA_SIZE]); + let message = StressTestMessage::new(i, vec![0; message_size - METADATA_SIZE], peer_id.clone()); network_channels.broadcast_topic_client.broadcast_message(message).await.unwrap(); i += 1; if i == num_messages { @@ -61,7 +61,7 @@ async fn main() { Ok(Some((received_message, _report_callback))) => { let received_message = received_message.unwrap(); output_vector.push(Record { - peer_id: peer_id.clone(), + peer_id: received_message.peer_id, id: received_message.id, start_time: received_message.time, end_time: SystemTime::now(), @@ -71,7 +71,8 @@ async fn main() { .as_micros(), }); i += 1; - if i == num_messages * 4 { + if i == num_messages * 4{ + tokio::time::sleep(std::time::Duration::from_secs(90)).await; break; } } @@ -83,7 +84,6 @@ async fn main() { for record in output_vector { wtr.serialize(record).unwrap(); } - tokio::time::sleep(std::time::Duration::from_secs(120)).await; } } }