diff --git a/Cargo.lock b/Cargo.lock index ba8637a..dca26ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,7 +432,7 @@ dependencies = [ [[package]] name = "chat-example" -version = "0.1.0" +version = "0.2.0" dependencies = [ "clap", "eyre", diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml index 87a4ed7..a0895df 100644 --- a/examples/chat/Cargo.toml +++ b/examples/chat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "chat-example" -version = "0.1.0" +version = "0.2.0" authors = ["Calimero Limited "] edition = "2021" repository = "https://github.com/calimero-network/boot-node" @@ -15,9 +15,11 @@ libp2p = { version = "0.53.2", features = [ "gossipsub", "identify", "macros", + "mdns", "noise", "ping", "quic", + "rendezvous", "relay", "tokio", "tcp", diff --git a/examples/chat/README.md b/examples/chat/README.md index bf5a474..d400e6f 100644 --- a/examples/chat/README.md +++ b/examples/chat/README.md @@ -1,44 +1,24 @@ # Chat -This examples show cases how to manually dial (connect to) either local peer or remote peer has a reservation on relay-server. +This examples show cases how to build chat application with DCUtR, mDNS, Relay, Rendezvous and GossipSub protocols. +- Local node gets reservation on the relay. +- Local node advertises relayed address at the Rendezvous node. +- Local nodes discovers other local nodes via mDNS discovery and attempts direct connection. +- Local nodes discovers other remote nodes via Rendezvous discovery and attempts hole punched connectioned. -## Run local only -This examples shows how to run two sessions locally and connect sessions by manually dialing local peer. - -Run first chat session in echo mode. -``` -cargo run -p chat-example -- --mode echo --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF -``` - -Run second chat session in interactive mode with local peer dial. -``` -cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/127.0.0.1/udp/4002/quic-v1/p2p/12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF -``` - -In the interactive session publish new message manually: -``` -publish calimero-network/examples/chat/v0.0.1 ola -``` - -## Run locally with remote peer dial in -This examples shows how to run two sessions locally and connect sessions manually by dialing private remote peer from each session. For the gossip message to pass from one local session to second local session it needs to go "the long way" around (local -> remote -> local). - -Additional info: -- Remote instance is running in a private subnet behind NAT. -- Remote instance PeerId: `12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp` -- Remote instance address at the relay server: `ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp` +## Run Run first chat session in interactive mode with remote peer dial. ``` -cargo run -p chat-example -- --mode interactive --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +cargo run -p chat-example -- --mode interactive --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.2 --boot-nodes /ip4/35.156.78.13/udp/4001/quic-v1/p2p/12D3KooWRnt7EmBwrNALhAXAgM151MdH7Ka9tvYS91ZUqnqwpjVg ``` Run second chat session in interactive mode with remote peer dial. ``` -cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.2 --boot-nodes /ip4/35.156.78.13/udp/4001/quic-v1/p2p/12D3KooWRnt7EmBwrNALhAXAgM151MdH7Ka9tvYS91ZUqnqwpjVg ``` In any interactive session publish new message manually: ``` -publish calimero-network/examples/chat/v0.0.1 ola +publish calimero-network/examples/chat/v0.0.2 ola ``` ## Debugging and known issues diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index 227740f..f2fdb98 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -31,7 +31,11 @@ struct Opt { /// The listening address of a relay server to connect to. #[clap(long)] - relay_address: Multiaddr, + boot_nodes: Vec, + + /// The listening address of a relay server to connect to. + #[clap(long, default_value = "/calimero/devnet/examples/chat")] + rendezvous_namespace: String, /// Optional list of peer addresses to dial immediately after network bootstrap. #[clap(long)] @@ -51,9 +55,9 @@ enum Mode { #[tokio::main] async fn main() -> eyre::Result<()> { tracing_subscriber::registry() - // "info,chat_example=debug,{}", + // "info,chat_example=debug,libp2p_mdns=warn,{}", .with(EnvFilter::builder().parse(format!( - "info,chat_example=debug,{}", + "info,libp2p_mdns=warn,{}", std::env::var("RUST_LOG").unwrap_or_default() ))?) .with(tracing_subscriber::fmt::layer()) @@ -63,8 +67,14 @@ async fn main() -> eyre::Result<()> { let keypair = generate_ed25519(opt.secret_key_seed); - let (network_client, mut network_events) = - network::run(keypair.clone(), opt.port, opt.relay_address.clone()).await?; + let (network_client, mut network_events) = network::run( + keypair.clone(), + opt.port, + libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?, + opt.boot_nodes.clone(), + opt.boot_nodes.clone(), + ) + .await?; if let Some(peer_addrs) = opt.dial_peer_addrs { for addr in peer_addrs { @@ -128,18 +138,6 @@ async fn handle_network_event( is_echo: bool, ) -> eyre::Result<()> { match event { - network::types::NetworkEvent::IdentifySent { peer_id } => { - debug!("Identify sent to {:?}", peer_id); - } - network::types::NetworkEvent::IdentifyReceived { - peer_id, - observed_addr, - } => { - debug!( - "Identify received from {:?} at {:?}", - peer_id, observed_addr - ); - } network::types::NetworkEvent::Message { message, .. } => { let text = String::from_utf8_lossy(&message.data); println!("{LINE_START} Received message: {:?}", text); @@ -166,9 +164,6 @@ async fn handle_network_event( network::types::NetworkEvent::ListeningOn { address, .. } => { info!("Listening on: {}", address); } - event => { - info!("Unhandled event: {:?}", event); - } } Ok(()) } diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs index 62145a9..f4842e9 100644 --- a/examples/chat/src/network.rs +++ b/examples/chat/src/network.rs @@ -1,9 +1,12 @@ use std::collections::hash_map::{self, HashMap}; +use std::collections::BTreeMap; use std::time::Duration; use libp2p::futures::prelude::*; use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{dcutr, gossipsub, identify, identity, noise, ping, relay, yamux, PeerId}; +use libp2p::{ + dcutr, gossipsub, identify, identity, mdns, noise, ping, relay, rendezvous, yamux, PeerId, +}; use multiaddr::Multiaddr; use tokio::sync::{mpsc, oneshot}; use tokio::time; @@ -22,135 +25,96 @@ struct Behaviour { dcutr: dcutr::Behaviour, identify: identify::Behaviour, gossipsub: gossipsub::Behaviour, + mdns: mdns::tokio::Behaviour, ping: ping::Behaviour, - relay_client: relay::client::Behaviour, + rendezvous: rendezvous::client::Behaviour, + relay: relay::client::Behaviour, } pub async fn run( keypair: identity::Keypair, port: u16, - relay_address: Multiaddr, + rendezvous_namespace: rendezvous::Namespace, + relay_addresses: Vec, + rendezvous_addresses: Vec, ) -> eyre::Result<(NetworkClient, mpsc::Receiver)> { - let (client, mut event_receiver, event_loop) = init(keypair).await?; + let mut rendezvous = BTreeMap::new(); + for address in &rendezvous_addresses { + let entry = match peek_peer_id(&address) { + Ok(peer_id) => (peer_id, RendezvousEntry::new(address.clone())), + Err(err) => { + eyre::bail!("Failed to parse rendezvous PeerId: {}", err); + } + }; + rendezvous.insert(entry.0, entry.1); + } + let mut relays = BTreeMap::new(); + for address in &relay_addresses { + let entry = match peek_peer_id(&address) { + Ok(peer_id) => (peer_id, RelayEntry::new(address.clone())), + Err(err) => { + eyre::bail!("Failed to parse relay PeerId: {}", err); + } + }; + relays.insert(entry.0, entry.1); + } + + let (client, event_receiver, event_loop) = + init(keypair, relays, rendezvous_namespace, rendezvous).await?; tokio::spawn(event_loop.run()); let swarm_listen: Vec = vec![ format!("/ip4/0.0.0.0/udp/{}/quic-v1", port).parse()?, format!("/ip4/0.0.0.0/tcp/{}", port).parse()?, ]; - for addr in swarm_listen { - client.listen_on(addr.clone()).await?; - } - // Reference: https://github.com/libp2p/rust-libp2p/blob/60fd566a955a33c42a6ab6eefc1f0fedef9f8b83/examples/dcutr/src/main.rs#L118 - loop { - tokio::select! { - Some(event) = event_receiver.recv() => { - match event { - types::NetworkEvent::ListeningOn { address, .. } => { - info!("Listening on: {}", address); - } - _ => { - error!("Recieved unexpected network event: {:?}", event) - } - } - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // Likely listening on all interfaces now, thus continuing by breaking the loop. - break; - } - } + for addr in swarm_listen { + client.listen_on(addr).await?; } - // Connect to the relay server. Not for the reservation or relayed connection, but to (a) learn - // our local public address and (b) enable a freshly started relay to learn its public address. - client.dial(relay_address.clone()).await?; - - let mut learned_observed_addr = false; - let mut told_relay_observed_addr = false; - let relay_peer_id = match relay_address.iter().find_map(|protocol| { - if let libp2p::multiaddr::Protocol::P2p(peer_id) = protocol { - Some(peer_id) - } else { - None - } - }) { - Some(peer_id) => peer_id, - None => eyre::bail!("Failed to get PeerId from relay address"), - }; - - loop { - match event_receiver.recv().await.unwrap() { - types::NetworkEvent::IdentifySent { peer_id } => { - if peer_id == relay_peer_id { - info!("Told relay its public address"); - told_relay_observed_addr = true; - } - } - types::NetworkEvent::IdentifyReceived { - peer_id, - observed_addr, - } => { - if peer_id == relay_peer_id { - info!("Relay told us our observed address: {}", observed_addr); - learned_observed_addr = true; - } - } - event => info!("unexpected: {event:?}"), - }; + tokio::spawn(run_init_dial( + client.clone(), + rendezvous_addresses, + relay_addresses, + )); - if learned_observed_addr && told_relay_observed_addr { - break; - } - } - - // Create reservation on relay server and wait for it to be accepted ... - client - .listen_on(relay_address.with(multiaddr::Protocol::P2pCircuit)) - .await?; + Ok((client, event_receiver)) +} - loop { - match event_receiver.recv().await.unwrap() { - types::NetworkEvent::RelayReservationAccepted => { - info!("Relay accepted our reservation"); - break; - } - event => info!("unexpected: {event:?}"), +async fn run_init_dial( + client: NetworkClient, + rendezvous_addresses: Vec, + relay_addresses: Vec, +) { + tokio::time::sleep(Duration::from_secs(5)).await; + + info!("Initiating dial to rendezvous and relay addresses."); + + for addr in rendezvous_addresses + .into_iter() + .chain(relay_addresses) + .collect::>() + .into_iter() + { + info!("Dialing address: {:?}", addr); + if let Err(err) = client.dial(addr).await { + error!("Failed to dial rendezvous address: {}", err); }; } - - // ... and now wait until we are listening on the "relayed" interfaces - // Reference: https://github.com/libp2p/rust-libp2p/blob/60fd566a955a33c42a6ab6eefc1f0fedef9f8b83/examples/dcutr/src/main.rs#L118 - loop { - tokio::select! { - Some(event) = event_receiver.recv() => { - match event { - types::NetworkEvent::ListeningOn { address, .. } => { - info!("Listening on: {}", address); - } - _ => { - error!("Recieved unexpected network event: {:?}", event) - } - } - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // Likely listening on all interfaces now, thus continuing by breaking the loop. - break; - } - } - } - - Ok((client, event_receiver)) } async fn init( keypair: identity::Keypair, + relays: BTreeMap, + rendezvous_namespace: rendezvous::Namespace, + rendezvous: BTreeMap, ) -> eyre::Result<( NetworkClient, mpsc::Receiver, EventLoop, )> { + let peer_id = keypair.public().to_peer_id(); let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair.clone()) .with_tokio() .with_tcp( @@ -161,7 +125,7 @@ async fn init( .with_quic() .with_relay_client(noise::Config::new, yamux::Config::default)? .with_behaviour(|keypair, relay_behaviour| Behaviour { - dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()), + dcutr: dcutr::Behaviour::new(peer_id.clone()), identify: identify::Behaviour::new( identify::Config::new(PROTOCOL_VERSION.to_owned(), keypair.public()) .with_push_listen_addr_updates(true), @@ -171,12 +135,13 @@ async fn init( gossipsub::Config::default(), ) .expect("Valid gossipsub config."), + mdns: mdns::Behaviour::new(mdns::Config::default(), peer_id.clone()) + .expect("Valid mdns config."), ping: ping::Behaviour::default(), - relay_client: relay_behaviour, + rendezvous: rendezvous::client::Behaviour::new(keypair.clone()), + relay: relay_behaviour, })? - .with_swarm_config(|cfg| { - cfg.with_idle_connection_timeout(time::Duration::from_secs(u64::MAX)) - }) + .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(time::Duration::from_secs(30))) .build(); let (command_sender, command_receiver) = mpsc::channel(32); @@ -186,7 +151,14 @@ async fn init( sender: command_sender, }; - let event_loop = EventLoop::new(swarm, command_receiver, event_sender); + let event_loop = EventLoop::new( + swarm, + command_receiver, + event_sender, + relays, + rendezvous_namespace, + rendezvous, + ); Ok((client, event_receiver, event_loop)) } @@ -195,24 +167,91 @@ pub(crate) struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, + relays: BTreeMap, + rendezvous_namespace: rendezvous::Namespace, + rendezvous: BTreeMap, pending_dial: HashMap>>>, } +#[derive(Debug)] +pub(crate) struct RelayEntry { + address: Multiaddr, + identify_state: IdentifyState, + reservation_state: RelayReservationState, +} + +impl RelayEntry { + pub(crate) fn new(address: Multiaddr) -> Self { + Self { + address, + identify_state: Default::default(), + reservation_state: Default::default(), + } + } +} + +#[derive(Debug)] +pub(crate) struct RendezvousEntry { + address: Multiaddr, + cookie: Option, + identify_state: IdentifyState, +} + +impl RendezvousEntry { + pub(crate) fn new(address: Multiaddr) -> Self { + Self { + address, + cookie: Default::default(), + identify_state: Default::default(), + } + } +} + +#[derive(Debug, Default, PartialEq)] +pub(crate) enum RelayReservationState { + #[default] + Unknown, + Requested, + Acquired, +} + +#[derive(Debug, Default)] +pub(crate) struct IdentifyState { + sent: bool, + received: bool, +} + +impl IdentifyState { + pub(crate) fn is_exchanged(&self) -> bool { + self.sent && self.received + } +} + impl EventLoop { fn new( swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, + relays: BTreeMap, + rendezvous_namespace: rendezvous::Namespace, + rendezvous: BTreeMap, ) -> Self { Self { swarm, command_receiver, event_sender, + relays, + rendezvous, + rendezvous_namespace, pending_dial: Default::default(), } } pub(crate) async fn run(mut self) { + let mut relays_dial_tick = tokio::time::interval(Duration::from_secs(90)); + let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(30)); + let mut rendezvous_dial_tick = tokio::time::interval(Duration::from_secs(90)); + loop { tokio::select! { event = self.swarm.next() => self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await, @@ -220,6 +259,9 @@ impl EventLoop { let Some(c) = command else { break }; self.handle_command(c).await; } + _ = relays_dial_tick.tick() => self.handle_relays_dial().await, + _ = rendezvous_dial_tick.tick() => self.handle_rendezvous_dial().await, + _ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discover().await, } } } @@ -233,15 +275,15 @@ impl EventLoop { }; } Command::Dial { peer_addr, sender } => { - let addr_meta = match MultiaddrMeta::try_from(&peer_addr) { - Ok(meta) => meta, + let peer_id = match peek_peer_id(&peer_addr) { + Ok(peer_id) => peer_id, Err(e) => { let _ = sender.send(Err(eyre::eyre!(e))); return; } }; - match self.pending_dial.entry(*addr_meta.peer_id()) { + match self.pending_dial.entry(peer_id) { hash_map::Entry::Occupied(_) => { let _ = sender.send(Ok(None)); } @@ -273,6 +315,21 @@ impl EventLoop { let _ = sender.send(Ok(topic)); } + Command::Publish { + topic, + data, + sender, + } => { + let id = match self.swarm.behaviour_mut().gossipsub.publish(topic, data) { + Ok(id) => id, + Err(err) => { + let _ = sender.send(Err(eyre::eyre!(err))); + return; + } + }; + + let _ = sender.send(Ok(id)); + } Command::PeerInfo { sender } => { let peers: Vec = self .swarm @@ -296,21 +353,6 @@ impl EventLoop { let _ = sender.send(MeshPeerInfo { count, peers }); } - Command::Publish { - topic, - data, - sender, - } => { - let id = match self.swarm.behaviour_mut().gossipsub.publish(topic, data) { - Ok(id) => id, - Err(err) => { - let _ = sender.send(Err(eyre::eyre!(err))); - return; - } - }; - - let _ = sender.send(Ok(id)); - } } } } @@ -333,6 +375,11 @@ pub(crate) enum Command { topic: gossipsub::IdentTopic, sender: oneshot::Sender>, }, + Publish { + topic: gossipsub::TopicHash, + data: Vec, + sender: oneshot::Sender>, + }, PeerInfo { sender: oneshot::Sender, }, @@ -340,11 +387,6 @@ pub(crate) enum Command { topic: gossipsub::TopicHash, sender: oneshot::Sender, }, - Publish { - topic: gossipsub::TopicHash, - data: Vec, - sender: oneshot::Sender>, - }, } #[allow(dead_code)] // Info structs for pretty printing @@ -361,139 +403,12 @@ pub(crate) struct MeshPeerInfo { peers: Vec, } -#[derive(Debug)] -pub(crate) struct MultiaddrMeta { - peer_id: PeerId, - relay_peer_ids: Vec, -} - -impl TryFrom<&Multiaddr> for MultiaddrMeta { - type Error = &'static str; - - fn try_from(value: &Multiaddr) -> Result { - let mut peer_ids = Vec::new(); - - let mut iter = value.iter(); - while let Some(protocol) = iter.next() { - match protocol { - multiaddr::Protocol::P2pCircuit => { - if peer_ids.is_empty() { - return Err("expected at least one p2p proto before P2pCircuit"); - } - let Some(multiaddr::Protocol::P2p(id)) = iter.next() else { - return Err("expected p2p proto after P2pCircuit"); - }; - peer_ids.push(id); - } - multiaddr::Protocol::P2p(id) => { - peer_ids.push(id); - } - _ => {} - } - } - - if let Some(peer_id) = peer_ids.pop() { - Ok(Self { - peer_id, - relay_peer_ids: peer_ids, - }) - } else { - Err("expected at least one p2p proto") - } - } -} - -impl MultiaddrMeta { - fn peer_id(&self) -> &PeerId { - &self.peer_id - } - - fn is_relayed(&self) -> bool { - !self.relay_peer_ids.is_empty() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_valid_multiaddr() { - let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp/p2p-circuit/p2p/12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK"; - let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); - - let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); - let expected_peer_id: PeerId = "12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK" - .parse() - .expect("valid peer id"); - let relay_peer_ids: Vec = vec![ - "12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" - .parse() - .expect("valid peer id"), - "12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp" - .parse() - .expect("valid peer id"), - ]; - - assert_eq!(meta.peer_id, expected_peer_id); - assert_eq!(meta.relay_peer_ids, relay_peer_ids); - assert!(meta.is_relayed()); - } - - #[test] - fn test_no_p2p_proto() { - let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1"; - let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); - - let result = MultiaddrMeta::try_from(&multiaddr); - assert!(result.is_err()); - assert_eq!(result.err(), Some("expected at least one p2p proto")); - } - - #[test] - fn test_p2p_circuit_without_previous_p2p() { - let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p-circuit"; - let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); - - let result = MultiaddrMeta::try_from(&multiaddr); - assert!(result.is_err()); - assert_eq!( - result.err(), - Some("expected at least one p2p proto before P2pCircuit") - ); - } - - #[test] - fn test_single_p2p_no_circuit() { - let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF"; - let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); - - let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); - let expected_peer_id: PeerId = "12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" - .parse() - .expect("valid peer id"); - - assert_eq!(meta.peer_id, expected_peer_id); - assert!(meta.relay_peer_ids.is_empty()); - assert!(!meta.is_relayed()); - } - - #[test] - fn test_p2p_circuit_with_single_p2p() { - let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp"; - let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); - - let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); - let expected_peer_id: PeerId = "12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp" - .parse() - .expect("valid peer id"); - let relay_peer_ids: Vec = - vec!["12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" - .parse() - .expect("valid peer id")]; - - assert_eq!(meta.peer_id, expected_peer_id); - assert_eq!(meta.relay_peer_ids, relay_peer_ids); - assert!(meta.is_relayed()); +pub(crate) fn peek_peer_id(address: &Multiaddr) -> eyre::Result { + match address.iter().last() { + Some(proto) => match proto { + multiaddr::Protocol::P2p(peer_id) => Ok(peer_id), + proto => Err(eyre::eyre!("expected p2p proto, got: {}", proto)), + }, + None => Err(eyre::eyre!("expected at least one protocol")), } } diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs index cfbf9b4..1e4163d 100644 --- a/examples/chat/src/network/client.rs +++ b/examples/chat/src/network/client.rs @@ -20,50 +20,39 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } - pub async fn subscribe( - &self, - topic: gossipsub::IdentTopic, - ) -> eyre::Result { + pub async fn dial(&self, peer_addr: Multiaddr) -> eyre::Result> { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::Subscribe { topic, sender }) + .send(Command::Dial { peer_addr, sender }) .await .expect("Command receiver not to be dropped."); receiver.await.expect("Sender not to be dropped.") } - pub async fn unsubscribe( + pub async fn subscribe( &self, topic: gossipsub::IdentTopic, ) -> eyre::Result { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::Unsubscribe { topic, sender }) - .await - .expect("Command receiver not to be dropped."); - - receiver.await.expect("Sender not to be dropped.") - } - - pub async fn peer_info(&self) -> super::PeerInfo { - let (sender, receiver) = oneshot::channel(); - - self.sender - .send(Command::PeerInfo { sender }) + .send(Command::Subscribe { topic, sender }) .await .expect("Command receiver not to be dropped."); receiver.await.expect("Sender not to be dropped.") } - pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo { + pub async fn unsubscribe( + &self, + topic: gossipsub::IdentTopic, + ) -> eyre::Result { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::MeshPeerCount { topic, sender }) + .send(Command::Unsubscribe { topic, sender }) .await .expect("Command receiver not to be dropped."); @@ -88,12 +77,22 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } + pub async fn peer_info(&self) -> super::PeerInfo { + let (sender, receiver) = oneshot::channel(); - pub async fn dial(&self, peer_addr: Multiaddr) -> eyre::Result> { + self.sender + .send(Command::PeerInfo { sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::Dial { peer_addr, sender }) + .send(Command::MeshPeerCount { topic, sender }) .await .expect("Command receiver not to be dropped."); diff --git a/examples/chat/src/network/events.rs b/examples/chat/src/network/events.rs index 89cc8e8..92eb35a 100644 --- a/examples/chat/src/network/events.rs +++ b/examples/chat/src/network/events.rs @@ -5,8 +5,10 @@ use super::*; mod dcutr; mod gossipsub; mod identify; +mod mdns; mod ping; -mod relay_client; +mod relay; +mod rendezvous; pub trait EventHandler { async fn handle(&mut self, event: E); @@ -18,9 +20,11 @@ impl EventLoop { SwarmEvent::Behaviour(event) => match event { BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Gossipsub(event) => events::EventHandler::handle(self, event).await, - BehaviourEvent::RelayClient(event) => { + BehaviourEvent::Mdns(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Rendezvous(event) => { events::EventHandler::handle(self, event).await } + BehaviourEvent::Relay(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, }, @@ -29,16 +33,26 @@ impl EventLoop { address, } => { let local_peer_id = *self.swarm.local_peer_id(); + let address = match address.with_p2p(local_peer_id) { + Ok(address) => address, + Err(address) => { + warn!( + "Failed to sanitize listen address with p2p proto, address: {:?}, p2p proto: {:?}", + address, local_peer_id + ); + address + } + }; if let Err(err) = self .event_sender .send(types::NetworkEvent::ListeningOn { listener_id, - address: address.with(multiaddr::Protocol::P2p(local_peer_id)), + address, }) .await { error!("Failed to send listening on event: {:?}", err); - } + }; } SwarmEvent::IncomingConnection { .. } => {} SwarmEvent::ConnectionEstablished { @@ -46,19 +60,7 @@ impl EventLoop { } => { debug!(%peer_id, ?endpoint, "Connection established"); match endpoint { - libp2p::core::ConnectedPoint::Dialer { address, .. } => { - let addr_meta = match MultiaddrMeta::try_from(&address) { - Ok(meta) => meta, - Err(e) => { - error!(%e, "Failed to parse dialer address meta for established connection"); - return; - } - }; - - if addr_meta.is_relayed() { - debug!("Connection established via relay"); - } - + libp2p::core::ConnectedPoint::Dialer { .. } => { if let Some(sender) = self.pending_dial.remove(&peer_id) { let _ = sender.send(Ok(Some(()))); } @@ -94,22 +96,62 @@ impl EventLoop { .. } => trace!("Dialing peer: {}", peer_id), SwarmEvent::ExpiredListenAddr { address, .. } => { - trace!("Expired listen address: {}", address) + debug!("Expired listen address: {}", address) } SwarmEvent::ListenerClosed { addresses, reason, .. - } => trace!("Listener closed: {:?} {:?}", addresses, reason.err()), - SwarmEvent::ListenerError { error, .. } => trace!(%error, "Listener error"), + } => { + debug!("Listener closed: {:?} {:?}", addresses, reason.err()) + } + SwarmEvent::ListenerError { error, .. } => debug!(%error, "Listener error"), SwarmEvent::NewExternalAddrCandidate { address } => { - trace!("New external address candidate: {}", address) + debug!("New external address candidate: {}", address) } SwarmEvent::ExternalAddrConfirmed { address } => { - trace!("External address confirmed: {}", address) + debug!("External address confirmed: {}", address); } SwarmEvent::ExternalAddrExpired { address } => { - trace!("External address expired: {}", address) + debug!("External address expired: {}", address) + } + SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { + debug!("New external address of peer: {} {}", peer_id, address) } - unhandled => warn!("Unhandled event: {:?}", unhandled), + _ => {} + } + } + + pub(super) async fn handle_rendezvous_discover(&mut self) { + for (peer_id, entry) in self.rendezvous.iter() { + if entry.identify_state.is_exchanged() { + self.swarm.behaviour_mut().rendezvous.discover( + Some(self.rendezvous_namespace.clone()), + entry.cookie.clone(), + None, + *peer_id, + ); + } + } + } + + pub(super) async fn handle_rendezvous_dial(&mut self) { + for (peer_id, entry) in self.rendezvous.iter() { + if self.swarm.is_connected(peer_id) { + continue; + }; + if let Err(err) = self.swarm.dial(entry.address.clone()) { + error!("Failed to dial rendezvous peer: {:?}", err); + }; + } + } + + pub(super) async fn handle_relays_dial(&mut self) { + for (peer_id, entry) in self.relays.iter() { + if self.swarm.is_connected(peer_id) { + continue; + }; + if let Err(err) = self.swarm.dial(entry.address.clone()) { + error!("Failed to dial relay peer: {:?}", err); + }; } } } diff --git a/examples/chat/src/network/events/identify.rs b/examples/chat/src/network/events/identify.rs index b51ebbd..d67a7aa 100644 --- a/examples/chat/src/network/events/identify.rs +++ b/examples/chat/src/network/events/identify.rs @@ -2,35 +2,53 @@ use libp2p::identify; use owo_colors::OwoColorize; use tracing::{debug, error}; -use super::{types, EventHandler, EventLoop}; +use super::{EventHandler, EventLoop, RelayReservationState}; impl EventHandler for EventLoop { async fn handle(&mut self, event: identify::Event) { debug!("{}: {:?}", "identify".yellow(), event); match event { - identify::Event::Received { - peer_id, - info: identify::Info { observed_addr, .. }, - } => { - if let Err(err) = self - .event_sender - .send(types::NetworkEvent::IdentifyReceived { - peer_id, - observed_addr, - }) - .await - { - error!("Failed to send message event: {:?}", err); + identify::Event::Received { peer_id, .. } => { + if let Some(entry) = self.relays.get_mut(&peer_id) { + entry.identify_state.received = true; + + if entry.identify_state.is_exchanged() + && matches!(entry.reservation_state, RelayReservationState::Unknown) + { + if let Err(err) = self + .swarm + .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) + { + error!("Failed to listen on relay address: {:?}", err); + }; + entry.reservation_state = RelayReservationState::Requested; + } + } + + if let Some(entry) = self.rendezvous.get_mut(&peer_id) { + entry.identify_state.received = true; } } identify::Event::Sent { peer_id } => { - if let Err(err) = self - .event_sender - .send(types::NetworkEvent::IdentifySent { peer_id }) - .await - { - error!("Failed to send message event: {:?}", err); + if let Some(entry) = self.relays.get_mut(&peer_id) { + entry.identify_state.sent = true; + + if entry.identify_state.is_exchanged() + && matches!(entry.reservation_state, RelayReservationState::Unknown) + { + if let Err(err) = self + .swarm + .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) + { + error!("Failed to listen on relay address: {:?}", err); + }; + entry.reservation_state = RelayReservationState::Requested; + } + } + + if let Some(entry) = self.rendezvous.get_mut(&peer_id) { + entry.identify_state.sent = true; } } _ => {} diff --git a/examples/chat/src/network/events/mdns.rs b/examples/chat/src/network/events/mdns.rs new file mode 100644 index 0000000..b8fcf7a --- /dev/null +++ b/examples/chat/src/network/events/mdns.rs @@ -0,0 +1,24 @@ +use libp2p::mdns; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: mdns::Event) { + debug!("{}: {:?}", "mdns".yellow(), event); + + match event { + mdns::Event::Discovered(peers) => { + for (peer_id, addr) in peers { + debug!(%peer_id, %addr, "Discovered peer via mdns"); + + if let Err(err) = self.swarm.dial(addr) { + error!("Failed to dial peer: {:?}", err); + } + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/events/relay.rs b/examples/chat/src/network/events/relay.rs new file mode 100644 index 0000000..ccd31ba --- /dev/null +++ b/examples/chat/src/network/events/relay.rs @@ -0,0 +1,36 @@ +use libp2p::relay; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{EventHandler, EventLoop, RelayReservationState}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: relay::client::Event) { + debug!("{}: {:?}", "relay".yellow(), event); + + match event { + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { + if let Some(entry) = self.relays.get_mut(&relay_peer_id) { + entry.reservation_state = RelayReservationState::Acquired; + + for (peer_id, entry) in self.rendezvous.iter() { + if entry.identify_state.is_exchanged() { + if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( + self.rendezvous_namespace.clone(), + *peer_id, + None, + ) { + error!(%err, "Failed to register at rendezvous"); + } + debug!( + "Registered at rendezvous node {} on the namespace {}", + *peer_id, self.rendezvous_namespace + ); + } + } + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/events/relay_client.rs b/examples/chat/src/network/events/relay_client.rs deleted file mode 100644 index ed35f3a..0000000 --- a/examples/chat/src/network/events/relay_client.rs +++ /dev/null @@ -1,24 +0,0 @@ -use libp2p::relay; -use owo_colors::OwoColorize; -use tracing::{debug, error}; - -use super::{types, EventHandler, EventLoop}; - -impl EventHandler for EventLoop { - async fn handle(&mut self, event: relay::client::Event) { - debug!("{}: {:?}", "relay_client".yellow(), event); - - match event { - relay::client::Event::ReservationReqAccepted { .. } => { - if let Err(err) = self - .event_sender - .send(types::NetworkEvent::RelayReservationAccepted) - .await - { - error!("Failed to send message event: {:?}", err); - } - } - _ => {} - } - } -} diff --git a/examples/chat/src/network/events/rendezvous.rs b/examples/chat/src/network/events/rendezvous.rs new file mode 100644 index 0000000..32cf0f7 --- /dev/null +++ b/examples/chat/src/network/events/rendezvous.rs @@ -0,0 +1,70 @@ +use libp2p::rendezvous; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: rendezvous::client::Event) { + debug!("{}: {:?}", "rendezvous".yellow(), event); + + match event { + rendezvous::client::Event::Discovered { + rendezvous_node, + registrations, + cookie, + } => { + if let Some(entry) = self.rendezvous.get_mut(&rendezvous_node) { + entry.cookie = Some(cookie); + } + + for registration in registrations { + if registration.record.peer_id() == *self.swarm.local_peer_id() { + continue; + } + + if self.swarm.is_connected(®istration.record.peer_id()) { + continue; + }; + + for address in registration.record.addresses() { + let peer = registration.record.peer_id(); + debug!(%peer, %address, "Discovered peer via rendezvous"); + + if let Err(err) = self.swarm.dial(address.clone()) { + error!("Failed to dial peer: {:?}", err); + } + } + } + } + rendezvous::client::Event::Registered { + rendezvous_node, .. + } => { + if let Some(entry) = self.rendezvous.get(&rendezvous_node) { + if entry.cookie.is_none() { + self.swarm.behaviour_mut().rendezvous.discover( + Some(self.rendezvous_namespace.clone()), + entry.cookie.clone(), + None, + rendezvous_node, + ); + } + } + } + rendezvous::client::Event::Expired { peer } => { + let local_peer_id = *self.swarm.local_peer_id(); + if peer == local_peer_id { + if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( + self.rendezvous_namespace.clone(), + peer, + None, + ) { + error!(%err, "Failed to re-register at rendezvous"); + }; + debug!("Re-registered at rendezvous"); + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/types.rs b/examples/chat/src/network/types.rs index 1f5f099..3602f6e 100644 --- a/examples/chat/src/network/types.rs +++ b/examples/chat/src/network/types.rs @@ -16,12 +16,4 @@ pub enum NetworkEvent { id: MessageId, message: Message, }, - IdentifySent { - peer_id: PeerId, - }, - IdentifyReceived { - peer_id: PeerId, - observed_addr: libp2p::Multiaddr, - }, - RelayReservationAccepted, }