From 19491c9f0d1704a602ed5a90e4f7fbb3c26a9e4a Mon Sep 17 00:00:00 2001 From: Fico <70634661+fbozic@users.noreply.github.com> Date: Fri, 31 May 2024 17:47:32 +0200 Subject: [PATCH] feat: implement relay and rendezvous peer discovery based on identify (#11) * feat: implement relay and rendezvous peer discovery based on identify, setup kad with boot node --- Cargo.lock | 2 +- examples/chat/Cargo.toml | 3 +- examples/chat/src/main.rs | 3 +- examples/chat/src/network.rs | 237 +++++++---------- examples/chat/src/network/client.rs | 8 +- examples/chat/src/network/discovery.rs | 240 ++++++++++++++++++ examples/chat/src/network/discovery/state.rs | 217 ++++++++++++++++ examples/chat/src/network/events.rs | 148 ++++++----- examples/chat/src/network/events/identify.rs | 53 ++-- examples/chat/src/network/events/kad.rs | 11 + examples/chat/src/network/events/relay.rs | 29 +-- .../chat/src/network/events/rendezvous.rs | 61 +++-- 12 files changed, 705 insertions(+), 307 deletions(-) create mode 100644 examples/chat/src/network/discovery.rs create mode 100644 examples/chat/src/network/discovery/state.rs create mode 100644 examples/chat/src/network/events/kad.rs diff --git a/Cargo.lock b/Cargo.lock index dca26ff..87db1c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,7 +432,7 @@ dependencies = [ [[package]] name = "chat-example" -version = "0.2.0" +version = "0.3.0" dependencies = [ "clap", "eyre", diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml index a0895df..124f407 100644 --- a/examples/chat/Cargo.toml +++ b/examples/chat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "chat-example" -version = "0.2.0" +version = "0.3.0" authors = ["Calimero Limited "] edition = "2021" repository = "https://github.com/calimero-network/boot-node" @@ -14,6 +14,7 @@ libp2p = { version = "0.53.2", features = [ "dns", "gossipsub", "identify", + "kad", "macros", "mdns", "noise", diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index f2fdb98..03e2be2 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -70,9 +70,8 @@ async fn main() -> eyre::Result<()> { let (network_client, mut network_events) = network::run( keypair.clone(), opt.port, + opt.boot_nodes, libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?, - opt.boot_nodes.clone(), - opt.boot_nodes.clone(), ) .await?; diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs index f4842e9..e912ad1 100644 --- a/examples/chat/src/network.rs +++ b/examples/chat/src/network.rs @@ -1,30 +1,33 @@ use std::collections::hash_map::{self, HashMap}; -use std::collections::BTreeMap; use std::time::Duration; use libp2p::futures::prelude::*; use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{ - dcutr, gossipsub, identify, identity, mdns, noise, ping, relay, rendezvous, yamux, PeerId, + dcutr, gossipsub, identify, identity, kad, mdns, noise, ping, relay, rendezvous, yamux, PeerId, }; use multiaddr::Multiaddr; use tokio::sync::{mpsc, oneshot}; use tokio::time; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, trace, warn}; pub mod client; +pub mod discovery; pub mod events; pub mod types; use client::NetworkClient; const PROTOCOL_VERSION: &str = concat!("/", env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); +const CALIMERO_KAD_PROTO_NAME: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/calimero/kad/1.0.0"); #[derive(NetworkBehaviour)] struct Behaviour { dcutr: dcutr::Behaviour, - identify: identify::Behaviour, gossipsub: gossipsub::Behaviour, + identify: identify::Behaviour, + kad: kad::Behaviour, mdns: mdns::tokio::Behaviour, ping: ping::Behaviour, rendezvous: rendezvous::client::Behaviour, @@ -34,34 +37,12 @@ struct Behaviour { pub async fn run( keypair: identity::Keypair, port: u16, + boot_nodes: Vec, rendezvous_namespace: rendezvous::Namespace, - relay_addresses: Vec, - rendezvous_addresses: Vec, ) -> eyre::Result<(NetworkClient, mpsc::Receiver)> { - let mut rendezvous = BTreeMap::new(); - for address in &rendezvous_addresses { - let entry = match peek_peer_id(&address) { - Ok(peer_id) => (peer_id, RendezvousEntry::new(address.clone())), - Err(err) => { - eyre::bail!("Failed to parse rendezvous PeerId: {}", err); - } - }; - rendezvous.insert(entry.0, entry.1); - } - - let mut relays = BTreeMap::new(); - for address in &relay_addresses { - let entry = match peek_peer_id(&address) { - Ok(peer_id) => (peer_id, RelayEntry::new(address.clone())), - Err(err) => { - eyre::bail!("Failed to parse relay PeerId: {}", err); - } - }; - relays.insert(entry.0, entry.1); - } - let (client, event_receiver, event_loop) = - init(keypair, relays, rendezvous_namespace, rendezvous).await?; + init(keypair, boot_nodes, rendezvous_namespace).await?; + tokio::spawn(event_loop.run()); let swarm_listen: Vec = vec![ @@ -73,47 +54,32 @@ pub async fn run( client.listen_on(addr).await?; } - tokio::spawn(run_init_dial( - client.clone(), - rendezvous_addresses, - relay_addresses, - )); - Ok((client, event_receiver)) } -async fn run_init_dial( - client: NetworkClient, - rendezvous_addresses: Vec, - relay_addresses: Vec, -) { - tokio::time::sleep(Duration::from_secs(5)).await; - - info!("Initiating dial to rendezvous and relay addresses."); - - for addr in rendezvous_addresses - .into_iter() - .chain(relay_addresses) - .collect::>() - .into_iter() - { - info!("Dialing address: {:?}", addr); - if let Err(err) = client.dial(addr).await { - error!("Failed to dial rendezvous address: {}", err); - }; - } -} - async fn init( keypair: identity::Keypair, - relays: BTreeMap, + boot_nodes: Vec, rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, ) -> eyre::Result<( NetworkClient, mpsc::Receiver, EventLoop, )> { + let bootstrap_peers = { + let mut peers = vec![]; + + for mut addr in boot_nodes { + let Some(multiaddr::Protocol::P2p(peer_id)) = addr.pop() else { + eyre::bail!("Failed to parse peer id from addr {:?}", addr); + }; + + peers.push((peer_id, addr)); + } + + peers + }; + let peer_id = keypair.public().to_peer_id(); let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair.clone()) .with_tokio() @@ -130,6 +96,27 @@ async fn init( identify::Config::new(PROTOCOL_VERSION.to_owned(), keypair.public()) .with_push_listen_addr_updates(true), ), + kad: { + let mut kademlia_config = kad::Config::default(); + kademlia_config.set_protocol_names(vec![CALIMERO_KAD_PROTO_NAME]); + + let mut kademlia = kad::Behaviour::with_config( + peer_id, + kad::store::MemoryStore::new(peer_id), + kademlia_config, + ); + + kademlia.set_mode(Some(kad::Mode::Client)); + + for (peer_id, addr) in bootstrap_peers { + kademlia.add_address(&peer_id, addr); + } + if let Err(err) = kademlia.bootstrap() { + warn!(%err, "Failed to bootstrap Kademlia"); + }; + + kademlia + }, gossipsub: gossipsub::Behaviour::new( gossipsub::MessageAuthenticity::Signed(keypair.clone()), gossipsub::Config::default(), @@ -151,14 +138,7 @@ async fn init( sender: command_sender, }; - let event_loop = EventLoop::new( - swarm, - command_receiver, - event_sender, - relays, - rendezvous_namespace, - rendezvous, - ); + let event_loop = EventLoop::new(swarm, command_receiver, event_sender, rendezvous_namespace); Ok((client, event_receiver, event_loop)) } @@ -167,90 +147,36 @@ pub(crate) struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - relays: BTreeMap, - rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, + discovery: discovery::Discovery, pending_dial: HashMap>>>, } -#[derive(Debug)] -pub(crate) struct RelayEntry { - address: Multiaddr, - identify_state: IdentifyState, - reservation_state: RelayReservationState, -} - -impl RelayEntry { - pub(crate) fn new(address: Multiaddr) -> Self { - Self { - address, - identify_state: Default::default(), - reservation_state: Default::default(), - } - } -} - -#[derive(Debug)] -pub(crate) struct RendezvousEntry { - address: Multiaddr, - cookie: Option, - identify_state: IdentifyState, -} - -impl RendezvousEntry { - pub(crate) fn new(address: Multiaddr) -> Self { - Self { - address, - cookie: Default::default(), - identify_state: Default::default(), - } - } -} - -#[derive(Debug, Default, PartialEq)] -pub(crate) enum RelayReservationState { - #[default] - Unknown, - Requested, - Acquired, -} - -#[derive(Debug, Default)] -pub(crate) struct IdentifyState { - sent: bool, - received: bool, -} - -impl IdentifyState { - pub(crate) fn is_exchanged(&self) -> bool { - self.sent && self.received - } -} - impl EventLoop { fn new( swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - relays: BTreeMap, rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, ) -> Self { Self { swarm, command_receiver, event_sender, - relays, - rendezvous, - rendezvous_namespace, + // discovery_state: discovery::DiscoveryState::new(rendezvous_namespace, 0.5), + discovery: discovery::Discovery::new(discovery::DiscoveryConfig::new( + discovery::RendezvousConfig::new( + rendezvous_namespace, + Duration::from_secs(90), + 0.5, + ), + )), pending_dial: Default::default(), } } pub(crate) async fn run(mut self) { - let mut relays_dial_tick = tokio::time::interval(Duration::from_secs(90)); - let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(30)); - let mut rendezvous_dial_tick = tokio::time::interval(Duration::from_secs(90)); + let mut rendezvous_discover_tick = + tokio::time::interval(self.discovery.config.rendezvous.discovery_interval); loop { tokio::select! { @@ -259,9 +185,7 @@ impl EventLoop { let Some(c) = command else { break }; self.handle_command(c).await; } - _ = relays_dial_tick.tick() => self.handle_relays_dial().await, - _ = rendezvous_dial_tick.tick() => self.handle_rendezvous_dial().await, - _ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discover().await, + _ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discoveries().await, } } } @@ -330,28 +254,41 @@ impl EventLoop { let _ = sender.send(Ok(id)); } - Command::PeerInfo { sender } => { - let peers: Vec = self + Command::PeersInfo { sender } => { + let peers = self .swarm .connected_peers() .into_iter() .map(|peer| peer.clone()) - .collect(); + .collect::>(); let count = peers.len(); - let _ = sender.send(PeerInfo { count, peers }); + let discovered_peers = self + .discovery + .state + .get_peers() + .map(|(id, peer)| (id.clone(), peer.clone())) + .collect::>(); + let discovered_count = discovered_peers.len(); + + let _ = sender.send(PeersInfo { + count, + peers, + discovered_count, + discovered_peers, + }); } - Command::MeshPeerCount { topic, sender } => { - let peers: Vec = self + Command::MeshPeersCount { topic, sender } => { + let peers = self .swarm .behaviour_mut() .gossipsub .mesh_peers(&topic) .map(|peer| peer.clone()) - .collect(); + .collect::>(); let count = peers.len(); - let _ = sender.send(MeshPeerInfo { count, peers }); + let _ = sender.send(MeshPeersInfo { count, peers }); } } } @@ -380,25 +317,27 @@ pub(crate) enum Command { data: Vec, sender: oneshot::Sender>, }, - PeerInfo { - sender: oneshot::Sender, + PeersInfo { + sender: oneshot::Sender, }, - MeshPeerCount { + MeshPeersCount { topic: gossipsub::TopicHash, - sender: oneshot::Sender, + sender: oneshot::Sender, }, } #[allow(dead_code)] // Info structs for pretty printing #[derive(Debug)] -pub(crate) struct PeerInfo { +pub(crate) struct PeersInfo { count: usize, peers: Vec, + discovered_count: usize, + discovered_peers: Vec<(PeerId, discovery::state::PeerInfo)>, } #[allow(dead_code)] // Info structs for pretty printing #[derive(Debug)] -pub(crate) struct MeshPeerInfo { +pub(crate) struct MeshPeersInfo { count: usize, peers: Vec, } diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs index 1e4163d..6b50c0f 100644 --- a/examples/chat/src/network/client.rs +++ b/examples/chat/src/network/client.rs @@ -77,22 +77,22 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } - pub async fn peer_info(&self) -> super::PeerInfo { + pub async fn peer_info(&self) -> super::PeersInfo { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::PeerInfo { sender }) + .send(Command::PeersInfo { sender }) .await .expect("Command receiver not to be dropped."); receiver.await.expect("Sender not to be dropped.") } - pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo { + pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeersInfo { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::MeshPeerCount { topic, sender }) + .send(Command::MeshPeersCount { topic, sender }) .await .expect("Command receiver not to be dropped."); diff --git a/examples/chat/src/network/discovery.rs b/examples/chat/src/network/discovery.rs new file mode 100644 index 0000000..1f5033b --- /dev/null +++ b/examples/chat/src/network/discovery.rs @@ -0,0 +1,240 @@ +use std::time; + +use eyre::ContextCompat; +use libp2p::{rendezvous, PeerId}; +use tracing::{debug, error}; + +pub(crate) mod state; +use super::EventLoop; + +#[derive(Debug)] +pub struct Discovery { + pub config: DiscoveryConfig, + pub(crate) state: state::DiscoveryState, +} + +impl Discovery { + pub(crate) fn new(config: DiscoveryConfig) -> Self { + Discovery { + state: Default::default(), + config, + } + } +} + +#[derive(Debug)] +pub(crate) struct DiscoveryConfig { + pub rendezvous: RendezvousConfig, +} + +impl DiscoveryConfig { + pub(crate) fn new(rendezvous: RendezvousConfig) -> Self { + DiscoveryConfig { rendezvous } + } +} + +#[derive(Debug)] +pub struct RendezvousConfig { + pub namespace: rendezvous::Namespace, + pub discovery_interval: time::Duration, + pub discovery_rpm: f32, +} + +impl RendezvousConfig { + pub(crate) fn new( + namespace: rendezvous::Namespace, + discovery_interval: time::Duration, + discovery_rpm: f32, + ) -> Self { + RendezvousConfig { + namespace, + discovery_interval, + discovery_rpm, + } + } +} + +impl EventLoop { + // Handles rendezvous discoveries for all rendezvous peers. + // If rendezvous peer is not connected, it will be dialed which will trigger the discovery during identify exchange. + pub(crate) async fn handle_rendezvous_discoveries(&mut self) { + for peer_id in self + .discovery + .state + .get_rendezvous_peer_ids() + .collect::>() + { + let peer_info = match self.discovery.state.get_peer_info(&peer_id) { + Some(info) => info, + None => { + error!(%peer_id, "Failed to lookup peer info"); + continue; + } + }; + + if !self.swarm.is_connected(&peer_id) { + for addr in peer_info.addrs().cloned() { + if let Err(err) = self.swarm.dial(addr) { + error!(%err, "Failed to dial rendezvous peer"); + } + } + } else { + if let Err(err) = self.perform_rendezvous_discovery(&peer_id) { + error!(%err, "Failed to perform rendezvous discover"); + } + } + } + } + + // Performs rendezvous discovery against the remote rendezvous peer if it's time to do so. + // This function expectes that the relay peer is already connected. + pub(crate) fn perform_rendezvous_discovery( + &mut self, + rendezvous_peer: &PeerId, + ) -> eyre::Result<()> { + let peer_info = self + .discovery + .state + .get_peer_info(rendezvous_peer) + .wrap_err("Failed to get peer info {}")?; + + let is_throttled = peer_info.rendezvous().map_or(false, |info| { + info.last_discovery_at().map_or(false, |instant| { + instant.elapsed() + > time::Duration::from_secs_f32( + 60.0 / self.discovery.config.rendezvous.discovery_rpm, + ) + }) + }); + + debug!( + %rendezvous_peer, + ?is_throttled, + "Checking if rendezvous discovery is throttled" + ); + + if !is_throttled { + self.swarm.behaviour_mut().rendezvous.discover( + Some(self.discovery.config.rendezvous.namespace.clone()), + peer_info + .rendezvous() + .and_then(|info| info.cookie()) + .cloned(), + None, + *rendezvous_peer, + ); + } + + Ok(()) + } + + // Broadcasts rendezvous registrations to all rendezvous peers if there are pending address changes. + // If rendezvous peer is not connected, it will be dialed which will trigger the registration during identify exchange. + pub(crate) fn broadcast_rendezvous_registrations(&mut self) -> eyre::Result<()> { + if !self.discovery.state.pending_addr_changes() { + return Ok(()); + } + + for peer_id in self + .discovery + .state + .get_rendezvous_peer_ids() + .collect::>() + { + let peer_info = match self.discovery.state.get_peer_info(&peer_id) { + Some(info) => info, + None => { + error!(%peer_id, "Failed to lookup peer info"); + continue; + } + }; + + if !self.swarm.is_connected(&peer_id) { + for addr in peer_info.addrs().cloned() { + if let Err(err) = self.swarm.dial(addr) { + error!(%err, "Failed to dial relay peer"); + } + } + } else { + if let Err(err) = self.update_rendezvous_registration(&peer_id) { + error!(%err, "Failed to update rendezvous registration"); + } + } + } + + self.discovery.state.clear_pending_addr_changes(); + + Ok(()) + } + + // Updates rendezvous registration on the remote rendezvous peer. + // If there are no external addresses for the node, the registration is considered successful. + // This function expectes that the relay peer is already connected. + pub(crate) fn update_rendezvous_registration(&mut self, peer_id: &PeerId) -> eyre::Result<()> { + if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( + self.discovery.config.rendezvous.namespace.clone(), + *peer_id, + None, + ) { + match err { + libp2p::rendezvous::client::RegisterError::NoExternalAddresses => {} + err => eyre::bail!(err), + } + } + + debug!( + %peer_id, rendezvous_namespace=%(self.discovery.config.rendezvous.namespace), + "Sent register request to rendezvous node" + ); + Ok(()) + } + + // Creates relay reservation if node didn't already request addres relayed address on the relay peer. + // This function expectes that the relay peer is already connected. + pub(crate) fn create_relay_reservation(&mut self, peer_id: &PeerId) -> eyre::Result<()> { + let peer_info = self + .discovery + .state + .get_peer_info(peer_id) + .wrap_err("Failed to get peer info")?; + + let is_relay_reservation_required = match peer_info.relay() { + Some(info) => match info.reservation_status() { + state::RelayReservationStatus::Discovered => true, + state::RelayReservationStatus::Expired => true, + _ => false, + }, + None => true, + }; + debug!( + ?peer_info, + %is_relay_reservation_required, + "Checking if relay reservation is required" + ); + + if !is_relay_reservation_required { + return Ok(()); + } + + let preferred_addr = peer_info + .get_preferred_addr() + .wrap_err("Failed to get preferred addr for relay peer")?; + + let relayed_addr = match preferred_addr + .clone() + .with(multiaddr::Protocol::P2pCircuit) + .with_p2p(self.swarm.local_peer_id().clone()) + { + Ok(addr) => addr, + Err(err) => { + eyre::bail!("Failed to construct relayed addr for relay peer: {:?}", err) + } + }; + self.swarm.listen_on(relayed_addr)?; + self.discovery + .state + .update_relay_reservation_status(&peer_id, state::RelayReservationStatus::Requested); + + Ok(()) + } +} diff --git a/examples/chat/src/network/discovery/state.rs b/examples/chat/src/network/discovery/state.rs new file mode 100644 index 0000000..d4e6ee3 --- /dev/null +++ b/examples/chat/src/network/discovery/state.rs @@ -0,0 +1,217 @@ +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::time; + +use libp2p::{rendezvous, Multiaddr, PeerId, StreamProtocol}; + +// The rendezvous protocol name is not public in libp2p, so we have to define it here. +// source: https://github.com/libp2p/rust-libp2p/blob/a8888a7978f08ec9b8762207bf166193bf312b94/protocols/rendezvous/src/lib.rs#L50C12-L50C92 +const RENDEZVOUS_PROTOCOL_NAME: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/rendezvous/1.0.0"); + +#[derive(Debug)] +pub(crate) struct DiscoveryState { + peers: BTreeMap, + relay_index: BTreeSet, + rendezvous_index: BTreeSet, + pending_addr_changes: bool, +} + +impl Default for DiscoveryState { + fn default() -> Self { + DiscoveryState { + peers: Default::default(), + relay_index: Default::default(), + rendezvous_index: Default::default(), + pending_addr_changes: false, + } + } +} + +impl DiscoveryState { + pub(crate) fn get_peers(&self) -> impl Iterator { + self.peers.iter() + } + + pub(crate) fn add_peer_addr(&mut self, peer_id: PeerId, addr: &Multiaddr) { + self.peers + .entry(peer_id) + .or_default() + .addrs + .insert(addr.clone()); + } + + pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); + self.relay_index.remove(peer_id); + self.rendezvous_index.remove(peer_id); + } + + pub(crate) fn update_peer_protocols(&mut self, peer_id: &PeerId, protocols: &[StreamProtocol]) { + protocols.iter().for_each(|protocol| { + if protocol == &libp2p::relay::HOP_PROTOCOL_NAME { + self.relay_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().relay = Some(PeerRelayInfo { + reservation_status: Default::default(), + }); + } + if protocol == &RENDEZVOUS_PROTOCOL_NAME { + self.rendezvous_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().rendezvous = Some(PeerRendezvousInfo { + cookie: None, + last_discovery_at: None, + }); + } + }); + } + + pub(crate) fn update_rendezvous_cookie( + &mut self, + rendezvous_peer: &PeerId, + cookie: rendezvous::Cookie, + ) { + self.peers + .entry(*rendezvous_peer) + .and_modify(|info| info.update_rendezvous_cookie(cookie.clone())) + .or_default() + .rendezvous = Some(PeerRendezvousInfo { + cookie: Some(cookie.clone()), + last_discovery_at: None, + }); + } + + pub(crate) fn update_relay_reservation_status( + &mut self, + relay_peer: &PeerId, + status: RelayReservationStatus, + ) { + self.peers + .entry(*relay_peer) + .and_modify(|info| info.update_relay_reservation_status(status)) + .or_default() + .relay = Some(PeerRelayInfo { + reservation_status: status, + }); + } + + pub(crate) fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + pub(crate) fn get_rendezvous_peer_ids(&self) -> impl Iterator + '_ { + self.rendezvous_index.iter().cloned() + } + + pub(crate) fn is_peer_relay(&self, peer_id: &PeerId) -> bool { + self.relay_index.contains(peer_id) + } + + pub(crate) fn is_peer_rendezvous(&self, peer_id: &PeerId) -> bool { + self.rendezvous_index.contains(peer_id) + } + + pub(crate) fn pending_addr_changes(&self) -> bool { + self.pending_addr_changes + } + + pub(crate) fn set_pending_addr_changes(&mut self) { + self.pending_addr_changes = true; + } + + pub(crate) fn clear_pending_addr_changes(&mut self) { + self.pending_addr_changes = false; + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerInfo { + addrs: HashSet, + relay: Option, + rendezvous: Option, +} + +impl PeerInfo { + pub(crate) fn addrs(&self) -> impl Iterator { + self.addrs.iter() + } + + pub(crate) fn get_preferred_addr(&self) -> Option<&Multiaddr> { + let udp_addrs: Vec<&Multiaddr> = self + .addrs + .iter() + .filter(|addr| { + addr.iter() + .any(|p| matches!(p, multiaddr::Protocol::Udp(_))) + }) + .collect(); + + match udp_addrs.len() { + 0 => self.addrs.iter().next(), + _ => Some(udp_addrs[0]), + } + } + + pub(crate) fn relay(&self) -> Option<&PeerRelayInfo> { + self.relay.as_ref() + } + + pub(crate) fn rendezvous(&self) -> Option<&PeerRendezvousInfo> { + self.rendezvous.as_ref() + } + + fn update_rendezvous_cookie(&mut self, cookie: rendezvous::Cookie) { + if let Some(ref mut rendezvous_info) = self.rendezvous { + rendezvous_info.update_cookie(cookie); + } + } + + fn update_relay_reservation_status(&mut self, status: RelayReservationStatus) { + if let Some(ref mut relay_info) = self.relay { + relay_info.update_reservation_status(status); + } + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerRelayInfo { + reservation_status: RelayReservationStatus, +} + +impl PeerRelayInfo { + pub(crate) fn reservation_status(&self) -> RelayReservationStatus { + self.reservation_status + } + + fn update_reservation_status(&mut self, status: RelayReservationStatus) { + self.reservation_status = status; + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub(crate) enum RelayReservationStatus { + #[default] + Discovered, + Requested, + Accepted, + Expired, +} + +#[derive(Clone, Debug)] +pub(crate) struct PeerRendezvousInfo { + cookie: Option, + last_discovery_at: Option, +} + +impl PeerRendezvousInfo { + pub(crate) fn cookie(&self) -> Option<&rendezvous::Cookie> { + self.cookie.as_ref() + } + + pub(crate) fn last_discovery_at(&self) -> Option { + self.last_discovery_at + } + + fn update_cookie(&mut self, cookie: rendezvous::Cookie) { + self.cookie = Some(cookie); + self.last_discovery_at = Some(time::Instant::now()); + } +} diff --git a/examples/chat/src/network/events.rs b/examples/chat/src/network/events.rs index 92eb35a..230b62e 100644 --- a/examples/chat/src/network/events.rs +++ b/examples/chat/src/network/events.rs @@ -1,10 +1,11 @@ -use tracing::error; +use tracing::{error, info}; use super::*; mod dcutr; mod gossipsub; mod identify; +mod kad; mod mdns; mod ping; mod relay; @@ -18,41 +19,32 @@ impl EventLoop { pub(super) async fn handle_swarm_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(event) => match event { - BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Gossipsub(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Kad(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Mdns(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Relay(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Rendezvous(event) => { events::EventHandler::handle(self, event).await } - BehaviourEvent::Relay(event) => events::EventHandler::handle(self, event).await, - BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, - BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, }, SwarmEvent::NewListenAddr { listener_id, address, } => { let local_peer_id = *self.swarm.local_peer_id(); - let address = match address.with_p2p(local_peer_id) { - Ok(address) => address, - Err(address) => { - warn!( - "Failed to sanitize listen address with p2p proto, address: {:?}, p2p proto: {:?}", - address, local_peer_id - ); - address - } - }; if let Err(err) = self .event_sender .send(types::NetworkEvent::ListeningOn { listener_id, - address, + address: address.with(multiaddr::Protocol::P2p(local_peer_id)), }) .await { error!("Failed to send listening on event: {:?}", err); - }; + } } SwarmEvent::IncomingConnection { .. } => {} SwarmEvent::ConnectionEstablished { @@ -61,6 +53,10 @@ impl EventLoop { debug!(%peer_id, ?endpoint, "Connection established"); match endpoint { libp2p::core::ConnectedPoint::Dialer { .. } => { + self.discovery + .state + .add_peer_addr(peer_id, endpoint.get_remote_address()); + if let Some(sender) = self.pending_dial.remove(&peer_id) { let _ = sender.send(Ok(Some(()))); } @@ -79,79 +75,113 @@ impl EventLoop { "Connection closed: {} {:?} {:?} {} {:?}", peer_id, connection_id, endpoint, num_established, cause ); + if !self.swarm.is_connected(&peer_id) + && !self.discovery.state.is_peer_relay(&peer_id) + && !self.discovery.state.is_peer_rendezvous(&peer_id) + { + self.discovery.state.remove_peer(&peer_id); + } } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - debug!(%error, ?peer_id, "Outgoing connection error"); if let Some(peer_id) = peer_id { if let Some(sender) = self.pending_dial.remove(&peer_id) { let _ = sender.send(Err(eyre::eyre!(error))); } } } - SwarmEvent::IncomingConnectionError { error, .. } => { - debug!(%error, "Incoming connection error") - } + SwarmEvent::IncomingConnectionError { .. } => {} SwarmEvent::Dialing { peer_id: Some(peer_id), .. - } => trace!("Dialing peer: {}", peer_id), + } => debug!("Dialing peer: {}", peer_id), SwarmEvent::ExpiredListenAddr { address, .. } => { - debug!("Expired listen address: {}", address) + trace!("Expired listen address: {}", address) } SwarmEvent::ListenerClosed { addresses, reason, .. - } => { - debug!("Listener closed: {:?} {:?}", addresses, reason.err()) - } - SwarmEvent::ListenerError { error, .. } => debug!(%error, "Listener error"), + } => trace!("Listener closed: {:?} {:?}", addresses, reason.err()), + SwarmEvent::ListenerError { error, .. } => trace!("Listener error: {:?}", error), SwarmEvent::NewExternalAddrCandidate { address } => { - debug!("New external address candidate: {}", address) + trace!("New external address candidate: {}", address) } SwarmEvent::ExternalAddrConfirmed { address } => { - debug!("External address confirmed: {}", address); + info!("External address confirmed: {}", address); + if let Ok(relayed_addr) = RelayedMultiaddr::try_from(&address) { + self.discovery.state.update_relay_reservation_status( + &relayed_addr.relay_peer, + discovery::state::RelayReservationStatus::Accepted, + ); + self.discovery.state.set_pending_addr_changes(); + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; + } } SwarmEvent::ExternalAddrExpired { address } => { - debug!("External address expired: {}", address) + debug!("External address expired: {}", address); + if let Ok(relayed_addr) = RelayedMultiaddr::try_from(&address) { + self.discovery.state.update_relay_reservation_status( + &relayed_addr.relay_peer_id(), + discovery::state::RelayReservationStatus::Expired, + ); + + self.discovery.state.set_pending_addr_changes(); + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; + } } SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { - debug!("New external address of peer: {} {}", peer_id, address) + trace!("New external address of peer: {} {}", peer_id, address) } - _ => {} + unhandled => warn!("Unhandled event: {:?}", unhandled), } } +} - pub(super) async fn handle_rendezvous_discover(&mut self) { - for (peer_id, entry) in self.rendezvous.iter() { - if entry.identify_state.is_exchanged() { - self.swarm.behaviour_mut().rendezvous.discover( - Some(self.rendezvous_namespace.clone()), - entry.cookie.clone(), - None, - *peer_id, - ); +#[derive(Debug)] +pub(crate) struct RelayedMultiaddr { + relay_peer: PeerId, +} + +impl TryFrom<&Multiaddr> for RelayedMultiaddr { + type Error = &'static str; + + fn try_from(value: &Multiaddr) -> Result { + let mut peer_ids = Vec::new(); + + let mut iter = value.iter(); + + while let Some(protocol) = iter.next() { + match protocol { + multiaddr::Protocol::P2pCircuit => { + if peer_ids.is_empty() { + return Err("expected at least one p2p proto before P2pCircuit"); + } + let Some(multiaddr::Protocol::P2p(id)) = iter.next() else { + return Err("expected p2p proto after P2pCircuit"); + }; + peer_ids.push(id); + } + multiaddr::Protocol::P2p(id) => { + peer_ids.push(id); + } + _ => {} } } - } - pub(super) async fn handle_rendezvous_dial(&mut self) { - for (peer_id, entry) in self.rendezvous.iter() { - if self.swarm.is_connected(peer_id) { - continue; - }; - if let Err(err) = self.swarm.dial(entry.address.clone()) { - error!("Failed to dial rendezvous peer: {:?}", err); - }; + if peer_ids.len() < 2 { + return Err("expected at least two p2p protos, one for peer and one for relay"); } + + Ok(Self { + relay_peer: peer_ids.remove(0), + }) } +} - pub(super) async fn handle_relays_dial(&mut self) { - for (peer_id, entry) in self.relays.iter() { - if self.swarm.is_connected(peer_id) { - continue; - }; - if let Err(err) = self.swarm.dial(entry.address.clone()) { - error!("Failed to dial relay peer: {:?}", err); - }; - } +impl RelayedMultiaddr { + fn relay_peer_id(&self) -> &PeerId { + &self.relay_peer } } diff --git a/examples/chat/src/network/events/identify.rs b/examples/chat/src/network/events/identify.rs index d67a7aa..2ee769d 100644 --- a/examples/chat/src/network/events/identify.rs +++ b/examples/chat/src/network/events/identify.rs @@ -2,53 +2,32 @@ use libp2p::identify; use owo_colors::OwoColorize; use tracing::{debug, error}; -use super::{EventHandler, EventLoop, RelayReservationState}; +use super::{EventHandler, EventLoop}; impl EventHandler for EventLoop { async fn handle(&mut self, event: identify::Event) { debug!("{}: {:?}", "identify".yellow(), event); match event { - identify::Event::Received { peer_id, .. } => { - if let Some(entry) = self.relays.get_mut(&peer_id) { - entry.identify_state.received = true; + identify::Event::Received { peer_id, info } => { + self.discovery + .state + .update_peer_protocols(&peer_id, &info.protocols); - if entry.identify_state.is_exchanged() - && matches!(entry.reservation_state, RelayReservationState::Unknown) - { - if let Err(err) = self - .swarm - .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) - { - error!("Failed to listen on relay address: {:?}", err); - }; - entry.reservation_state = RelayReservationState::Requested; - } + if self.discovery.state.is_peer_relay(&peer_id) { + if let Err(err) = self.create_relay_reservation(&peer_id) { + error!(%err, "Failed to handle relay reservation"); + }; } - if let Some(entry) = self.rendezvous.get_mut(&peer_id) { - entry.identify_state.received = true; - } - } - identify::Event::Sent { peer_id } => { - if let Some(entry) = self.relays.get_mut(&peer_id) { - entry.identify_state.sent = true; - - if entry.identify_state.is_exchanged() - && matches!(entry.reservation_state, RelayReservationState::Unknown) - { - if let Err(err) = self - .swarm - .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) - { - error!("Failed to listen on relay address: {:?}", err); - }; - entry.reservation_state = RelayReservationState::Requested; - } - } + if self.discovery.state.is_peer_rendezvous(&peer_id) { + if let Err(err) = self.perform_rendezvous_discovery(&peer_id) { + error!(%err, "Failed to perform rendezvous discovery"); + }; - if let Some(entry) = self.rendezvous.get_mut(&peer_id) { - entry.identify_state.sent = true; + if let Err(err) = self.update_rendezvous_registration(&peer_id) { + error!(%err, "Failed to update registration discovery"); + }; } } _ => {} diff --git a/examples/chat/src/network/events/kad.rs b/examples/chat/src/network/events/kad.rs new file mode 100644 index 0000000..101faa8 --- /dev/null +++ b/examples/chat/src/network/events/kad.rs @@ -0,0 +1,11 @@ +use libp2p::kad; +use owo_colors::OwoColorize; +use tracing::debug; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: kad::Event) { + debug!("{}: {:?}", "kad".yellow(), event); + } +} diff --git a/examples/chat/src/network/events/relay.rs b/examples/chat/src/network/events/relay.rs index ccd31ba..545443a 100644 --- a/examples/chat/src/network/events/relay.rs +++ b/examples/chat/src/network/events/relay.rs @@ -1,36 +1,11 @@ use libp2p::relay; use owo_colors::OwoColorize; -use tracing::{debug, error}; +use tracing::debug; -use super::{EventHandler, EventLoop, RelayReservationState}; +use super::{EventHandler, EventLoop}; impl EventHandler for EventLoop { async fn handle(&mut self, event: relay::client::Event) { debug!("{}: {:?}", "relay".yellow(), event); - - match event { - relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { - if let Some(entry) = self.relays.get_mut(&relay_peer_id) { - entry.reservation_state = RelayReservationState::Acquired; - - for (peer_id, entry) in self.rendezvous.iter() { - if entry.identify_state.is_exchanged() { - if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( - self.rendezvous_namespace.clone(), - *peer_id, - None, - ) { - error!(%err, "Failed to register at rendezvous"); - } - debug!( - "Registered at rendezvous node {} on the namespace {}", - *peer_id, self.rendezvous_namespace - ); - } - } - } - } - _ => {} - } } } diff --git a/examples/chat/src/network/events/rendezvous.rs b/examples/chat/src/network/events/rendezvous.rs index 32cf0f7..ff683e7 100644 --- a/examples/chat/src/network/events/rendezvous.rs +++ b/examples/chat/src/network/events/rendezvous.rs @@ -14,23 +14,27 @@ impl EventHandler for EventLoop { registrations, cookie, } => { - if let Some(entry) = self.rendezvous.get_mut(&rendezvous_node) { - entry.cookie = Some(cookie); - } + self.discovery + .state + .update_rendezvous_cookie(&rendezvous_node, cookie); for registration in registrations { if registration.record.peer_id() == *self.swarm.local_peer_id() { continue; } - if self.swarm.is_connected(®istration.record.peer_id()) { + let peer_id = registration.record.peer_id(); + if self.swarm.is_connected(&peer_id) { continue; }; + debug!( + %peer_id, + addrs=?(registration.record.addresses()), + "Discovered unconnected peer via rendezvous, attempting to dial it" + ); for address in registration.record.addresses() { - let peer = registration.record.peer_id(); - debug!(%peer, %address, "Discovered peer via rendezvous"); - + debug!(%peer_id, %address, "Dialing peer discovered via rendezvous"); if let Err(err) = self.swarm.dial(address.clone()) { error!("Failed to dial peer: {:?}", err); } @@ -40,29 +44,32 @@ impl EventHandler for EventLoop { rendezvous::client::Event::Registered { rendezvous_node, .. } => { - if let Some(entry) = self.rendezvous.get(&rendezvous_node) { - if entry.cookie.is_none() { - self.swarm.behaviour_mut().rendezvous.discover( - Some(self.rendezvous_namespace.clone()), - entry.cookie.clone(), - None, - rendezvous_node, - ); + if let Some(peer_info) = self.discovery.state.get_peer_info(&rendezvous_node) { + if peer_info + .rendezvous() + .and_then(|info| info.cookie()) + .is_none() + { + debug!(%rendezvous_node, "Discovering peers via rendezvous after registration"); + if let Err(err) = self.perform_rendezvous_discovery(&rendezvous_node) { + error!(%err, "Failed to run rendezvous discovery after registration"); + } } } } - rendezvous::client::Event::Expired { peer } => { - let local_peer_id = *self.swarm.local_peer_id(); - if peer == local_peer_id { - if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( - self.rendezvous_namespace.clone(), - peer, - None, - ) { - error!(%err, "Failed to re-register at rendezvous"); - }; - debug!("Re-registered at rendezvous"); - } + rendezvous::client::Event::DiscoverFailed { + rendezvous_node, + namespace, + error, + } => { + error!(?rendezvous_node, ?namespace, error_code=?error, "Rendezvous discovery failed"); + } + rendezvous::client::Event::RegisterFailed { + rendezvous_node, + namespace, + error, + } => { + error!(?rendezvous_node, ?namespace, error_code=?error, "Rendezvous registration failed"); } _ => {} }