Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add a p2p test for testing the networking layer #450

Merged
merged 9 commits into from
Nov 8, 2024
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ testcontainers = { version = "0.20.1" }

# Symbiotic
symbiotic-rs = { version = "0.1.0" }
dashmap = "6.1.0"
bincode2 = "2.0.1"

[profile.dev.package.backtrace]
opt-level = 3
Expand Down
1 change: 0 additions & 1 deletion blueprint-manager/src/sdk/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub async fn generate_node_input<KBE: KeyValueStoreBackend>(
libp2p_key,
ecdsa_key,
config.bootnodes.clone(),
config.bind_ip,
config.bind_port,
network_ids.clone(),
);
Expand Down
Submodule forge-std updated 1 files
+1 −1 package.json
2 changes: 1 addition & 1 deletion blueprints/incredible-squaring/contracts/lib/forge-std
Submodule forge-std updated 1 files
+1 −1 package.json
2 changes: 2 additions & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ gadget-blueprint-proc-macro-core = { workspace = true, default-features = false

# Benchmarking deps
sysinfo = { workspace = true }
dashmap = { workspace = true }
lazy_static = "1.5.0"
bincode2 = { workspace = true }


[target.'cfg(not(target_family = "wasm"))'.dependencies.libp2p]
Expand Down
18 changes: 11 additions & 7 deletions sdk/src/network/channels.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::network::{deserialize, IdentifierInfo, Network, ProtocolMessage};
use crate::network::{deserialize, IdentifierInfo, Network, NetworkMultiplexer, ProtocolMessage};
use futures::StreamExt;
use gadget_io::tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use round_based::{Incoming, MessageDestination, MessageType, MsgId, Outgoing, PartyIndex};
Expand Down Expand Up @@ -98,7 +98,8 @@ pub fn create_job_manager_to_async_protocol_channel_split<
let (tx_to_outbound_1, mut rx_to_outbound_1) = futures::channel::mpsc::unbounded::<C1>();
let (tx_to_outbound_2, mut rx_to_outbound_2) =
gadget_io::tokio::sync::mpsc::unbounded_channel::<C2>();
let network_clone = network.clone();
let multiplexed_network = NetworkMultiplexer::new(network);
let network = multiplexed_network.multiplex(identifier_info);
let user_id_mapping_clone = user_id_mapping.clone();
let my_user_id = user_id_mapping
.iter()
Expand All @@ -113,11 +114,12 @@ pub fn create_job_manager_to_async_protocol_channel_split<

// Take the messages the async protocol sends to the outbound channel and send them to the gadget
let _ = gadget_io::tokio::task::spawn(async move {
let network = &network;
let channel_1_task = async move {
while let Some(msg) = rx_to_outbound_1.next().await {
if let Err(err) = wrap_message_and_forward_to_network::<_, C1, C2, (), _>(
msg,
&network,
network,
&user_id_mapping,
my_user_id,
identifier_info,
Expand All @@ -134,7 +136,7 @@ pub fn create_job_manager_to_async_protocol_channel_split<
while let Some(msg) = rx_to_outbound_2.recv().await {
if let Err(err) = wrap_message_and_forward_to_network::<_, C1, C2, (), _>(
msg,
&network_clone,
network,
&user_id_mapping_clone,
my_user_id,
identifier_info,
Expand Down Expand Up @@ -505,16 +507,18 @@ pub fn create_job_manager_to_async_protocol_channel_split_io<

let (tx_to_outbound_1, mut rx_to_outbound_1) = futures::channel::mpsc::unbounded::<O>();
let (tx_to_outbound_2, mut rx_to_outbound_2) = futures::channel::mpsc::unbounded::<C2>();
let network_clone = network.clone();
let multiplexed_network = NetworkMultiplexer::new(network);
let network = multiplexed_network.multiplex(identifier_info);
let user_id_mapping_clone = user_id_mapping.clone();

// Take the messages from the async protocol and send them to the gadget
let _ = gadget_io::tokio::task::spawn(async move {
let network = &network;
let channel_1_task = async move {
while let Some(msg) = rx_to_outbound_1.next().await {
if let Err(err) = wrap_message_and_forward_to_network::<_, O::Inner, C2, (), _>(
msg,
&network,
network,
&user_id_mapping,
my_user_id,
identifier_info,
Expand All @@ -533,7 +537,7 @@ pub fn create_job_manager_to_async_protocol_channel_split_io<
while let Some(msg) = rx_to_outbound_2.next().await {
if let Err(err) = wrap_message_and_forward_to_network::<_, O::Inner, C2, (), _>(
msg,
&network_clone,
network,
&user_id_mapping_clone,
my_user_id,
identifier_info,
Expand Down
35 changes: 32 additions & 3 deletions sdk/src/network/handlers/connections.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#![allow(unused_results, clippy::used_underscore_binding)]

use crate::network::gossip::{MyBehaviourRequest, NetworkService};
use crate::{error, trace};
use crate::{error, trace, warn};
use itertools::Itertools;
use libp2p::PeerId;
use sp_core::{keccak_256, Pair};

Expand All @@ -12,7 +13,7 @@ impl NetworkService<'_> {
peer_id: PeerId,
num_established: u32,
) {
trace!("Connection established");
crate::debug!("Connection established");
if num_established == 1 {
let my_peer_id = self.swarm.local_peer_id();
let msg = my_peer_id.to_bytes();
Expand Down Expand Up @@ -86,6 +87,34 @@ impl NetworkService<'_> {
_peer_id: Option<PeerId>,
error: libp2p::swarm::DialError,
) {
error!("Outgoing connection error: {error}");
if let libp2p::swarm::DialError::Transport(addrs) = error {
let read = self.ecdsa_peer_id_to_libp2p_id.read().await;
for (addr, err) in addrs {
if let Some(peer_id) = get_peer_id_from_multiaddr(&addr) {
if !read.values().contains(&peer_id) {
warn!(
"Outgoing connection error to peer: {peer_id} at {addr}: {err}",
peer_id = peer_id,
addr = addr,
err = err
);
}
}
}
} else {
error!("Outgoing connection error to peer: {error}");
}
}
}

fn get_peer_id_from_multiaddr(addr: &libp2p::Multiaddr) -> Option<PeerId> {
addr.iter()
.find_map(|proto| {
if let libp2p::multiaddr::Protocol::P2p(peer_id) = proto {
Some(Some(peer_id))
} else {
None
}
})
.flatten()
}
Loading
Loading