From 8e2bef7f42f68d6a1afba9947c61d0ff1763c2e2 Mon Sep 17 00:00:00 2001 From: Filip Bozic <70634661+fbozic@users.noreply.github.com> Date: Wed, 29 May 2024 19:26:33 +0200 Subject: [PATCH] feat: implement dynamic relay and rendezvous peer discovery, setup kad with boot node --- examples/chat/Cargo.toml | 1 + examples/chat/src/main.rs | 3 +- examples/chat/src/network.rs | 198 +++++---------- examples/chat/src/network/client.rs | 4 +- examples/chat/src/network/discovery.rs | 181 ++++++++++++++ examples/chat/src/network/discovery/model.rs | 229 ++++++++++++++++++ examples/chat/src/network/events.rs | 67 ++--- examples/chat/src/network/events/identify.rs | 58 ++--- examples/chat/src/network/events/kad.rs | 11 + examples/chat/src/network/events/relay.rs | 26 +- .../chat/src/network/events/rendezvous.rs | 55 +++-- 11 files changed, 568 insertions(+), 265 deletions(-) create mode 100644 examples/chat/src/network/discovery.rs create mode 100644 examples/chat/src/network/discovery/model.rs create mode 100644 examples/chat/src/network/events/kad.rs diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml index a0895df..067e81b 100644 --- a/examples/chat/Cargo.toml +++ b/examples/chat/Cargo.toml @@ -14,6 +14,7 @@ libp2p = { version = "0.53.2", features = [ "dns", "gossipsub", "identify", + "kad", "macros", "mdns", "noise", diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index f2fdb98..9ceb893 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -70,9 +70,8 @@ async fn main() -> eyre::Result<()> { let (network_client, mut network_events) = network::run( keypair.clone(), opt.port, - libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?, - opt.boot_nodes.clone(), opt.boot_nodes.clone(), + libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?, ) .await?; diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs index f4842e9..d437af7 100644 --- a/examples/chat/src/network.rs +++ b/examples/chat/src/network.rs @@ -1,30 +1,33 @@ use std::collections::hash_map::{self, HashMap}; -use std::collections::BTreeMap; use std::time::Duration; use libp2p::futures::prelude::*; use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{ - dcutr, gossipsub, identify, identity, mdns, noise, ping, relay, rendezvous, yamux, PeerId, + dcutr, gossipsub, identify, identity, kad, mdns, noise, ping, relay, rendezvous, yamux, PeerId, }; use multiaddr::Multiaddr; use tokio::sync::{mpsc, oneshot}; use tokio::time; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, info, trace, warn}; pub mod client; +pub mod discovery; pub mod events; pub mod types; use client::NetworkClient; const PROTOCOL_VERSION: &str = concat!("/", env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); +const CALIMERO_KAD_PROTO_NAME: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/calimero/kad/1.0.0"); #[derive(NetworkBehaviour)] struct Behaviour { dcutr: dcutr::Behaviour, - identify: identify::Behaviour, gossipsub: gossipsub::Behaviour, + identify: identify::Behaviour, + kad: kad::Behaviour, mdns: mdns::tokio::Behaviour, ping: ping::Behaviour, rendezvous: rendezvous::client::Behaviour, @@ -34,34 +37,12 @@ struct Behaviour { pub async fn run( keypair: identity::Keypair, port: u16, + boot_nodes: Vec, rendezvous_namespace: rendezvous::Namespace, - relay_addresses: Vec, - rendezvous_addresses: Vec, ) -> eyre::Result<(NetworkClient, mpsc::Receiver)> { - let mut rendezvous = BTreeMap::new(); - for address in &rendezvous_addresses { - let entry = match peek_peer_id(&address) { - Ok(peer_id) => (peer_id, RendezvousEntry::new(address.clone())), - Err(err) => { - eyre::bail!("Failed to parse rendezvous PeerId: {}", err); - } - }; - rendezvous.insert(entry.0, entry.1); - } - - let mut relays = BTreeMap::new(); - for address in &relay_addresses { - let entry = match peek_peer_id(&address) { - Ok(peer_id) => (peer_id, RelayEntry::new(address.clone())), - Err(err) => { - eyre::bail!("Failed to parse relay PeerId: {}", err); - } - }; - relays.insert(entry.0, entry.1); - } - let (client, event_receiver, event_loop) = - init(keypair, relays, rendezvous_namespace, rendezvous).await?; + init(keypair, boot_nodes, rendezvous_namespace).await?; + tokio::spawn(event_loop.run()); let swarm_listen: Vec = vec![ @@ -73,47 +54,32 @@ pub async fn run( client.listen_on(addr).await?; } - tokio::spawn(run_init_dial( - client.clone(), - rendezvous_addresses, - relay_addresses, - )); - Ok((client, event_receiver)) } -async fn run_init_dial( - client: NetworkClient, - rendezvous_addresses: Vec, - relay_addresses: Vec, -) { - tokio::time::sleep(Duration::from_secs(5)).await; - - info!("Initiating dial to rendezvous and relay addresses."); - - for addr in rendezvous_addresses - .into_iter() - .chain(relay_addresses) - .collect::>() - .into_iter() - { - info!("Dialing address: {:?}", addr); - if let Err(err) = client.dial(addr).await { - error!("Failed to dial rendezvous address: {}", err); - }; - } -} - async fn init( keypair: identity::Keypair, - relays: BTreeMap, + boot_nodes: Vec, rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, ) -> eyre::Result<( NetworkClient, mpsc::Receiver, EventLoop, )> { + let bootstrap_peers = { + let mut peers = vec![]; + + for mut addr in boot_nodes { + let Some(multiaddr::Protocol::P2p(peer_id)) = addr.pop() else { + eyre::bail!("Failed to parse peer id from addr {:?}", addr); + }; + + peers.push((peer_id, addr)); + } + + peers + }; + let peer_id = keypair.public().to_peer_id(); let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair.clone()) .with_tokio() @@ -130,6 +96,28 @@ async fn init( identify::Config::new(PROTOCOL_VERSION.to_owned(), keypair.public()) .with_push_listen_addr_updates(true), ), + kad: { + let mut kademlia_config = kad::Config::default(); + kademlia_config + .set_protocol_names(std::iter::once(CALIMERO_KAD_PROTO_NAME).collect()); + + let mut kademlia = kad::Behaviour::with_config( + peer_id, + kad::store::MemoryStore::new(peer_id), + kademlia_config, + ); + + kademlia.set_mode(Some(kad::Mode::Client)); + + for (peer_id, addr) in bootstrap_peers { + kademlia.add_address(&peer_id, addr); + } + if let Err(err) = kademlia.bootstrap() { + warn!(%err, "Failed to bootstrap Kademlia"); + }; + + kademlia + }, gossipsub: gossipsub::Behaviour::new( gossipsub::MessageAuthenticity::Signed(keypair.clone()), gossipsub::Config::default(), @@ -151,14 +139,7 @@ async fn init( sender: command_sender, }; - let event_loop = EventLoop::new( - swarm, - command_receiver, - event_sender, - relays, - rendezvous_namespace, - rendezvous, - ); + let event_loop = EventLoop::new(swarm, command_receiver, event_sender, rendezvous_namespace); Ok((client, event_receiver, event_loop)) } @@ -167,90 +148,30 @@ pub(crate) struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - relays: BTreeMap, + network_state: discovery::model::DiscoveryState, rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, pending_dial: HashMap>>>, } -#[derive(Debug)] -pub(crate) struct RelayEntry { - address: Multiaddr, - identify_state: IdentifyState, - reservation_state: RelayReservationState, -} - -impl RelayEntry { - pub(crate) fn new(address: Multiaddr) -> Self { - Self { - address, - identify_state: Default::default(), - reservation_state: Default::default(), - } - } -} - -#[derive(Debug)] -pub(crate) struct RendezvousEntry { - address: Multiaddr, - cookie: Option, - identify_state: IdentifyState, -} - -impl RendezvousEntry { - pub(crate) fn new(address: Multiaddr) -> Self { - Self { - address, - cookie: Default::default(), - identify_state: Default::default(), - } - } -} - -#[derive(Debug, Default, PartialEq)] -pub(crate) enum RelayReservationState { - #[default] - Unknown, - Requested, - Acquired, -} - -#[derive(Debug, Default)] -pub(crate) struct IdentifyState { - sent: bool, - received: bool, -} - -impl IdentifyState { - pub(crate) fn is_exchanged(&self) -> bool { - self.sent && self.received - } -} - impl EventLoop { fn new( swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - relays: BTreeMap, rendezvous_namespace: rendezvous::Namespace, - rendezvous: BTreeMap, ) -> Self { Self { swarm, command_receiver, event_sender, - relays, - rendezvous, + network_state: discovery::model::DiscoveryState::new(), rendezvous_namespace, pending_dial: Default::default(), } } pub(crate) async fn run(mut self) { - let mut relays_dial_tick = tokio::time::interval(Duration::from_secs(90)); - let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(30)); - let mut rendezvous_dial_tick = tokio::time::interval(Duration::from_secs(90)); + let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(90)); loop { tokio::select! { @@ -259,9 +180,8 @@ impl EventLoop { let Some(c) = command else { break }; self.handle_command(c).await; } - _ = relays_dial_tick.tick() => self.handle_relays_dial().await, - _ = rendezvous_dial_tick.tick() => self.handle_rendezvous_dial().await, - _ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discover().await, + // _ = relay_dial_tick.tick() => self.handle_relay_reservations().await, + _ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discoveries().await, } } } @@ -339,7 +259,7 @@ impl EventLoop { .collect(); let count = peers.len(); - let _ = sender.send(PeerInfo { count, peers }); + let _ = sender.send(PeersInfo { count, peers }); } Command::MeshPeerCount { topic, sender } => { let peers: Vec = self @@ -351,7 +271,7 @@ impl EventLoop { .collect(); let count = peers.len(); - let _ = sender.send(MeshPeerInfo { count, peers }); + let _ = sender.send(MeshPeersInfo { count, peers }); } } } @@ -381,24 +301,24 @@ pub(crate) enum Command { sender: oneshot::Sender>, }, PeerInfo { - sender: oneshot::Sender, + sender: oneshot::Sender, }, MeshPeerCount { topic: gossipsub::TopicHash, - sender: oneshot::Sender, + sender: oneshot::Sender, }, } #[allow(dead_code)] // Info structs for pretty printing #[derive(Debug)] -pub(crate) struct PeerInfo { +pub(crate) struct PeersInfo { count: usize, peers: Vec, } #[allow(dead_code)] // Info structs for pretty printing #[derive(Debug)] -pub(crate) struct MeshPeerInfo { +pub(crate) struct MeshPeersInfo { count: usize, peers: Vec, } diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs index 1e4163d..26b863e 100644 --- a/examples/chat/src/network/client.rs +++ b/examples/chat/src/network/client.rs @@ -77,7 +77,7 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } - pub async fn peer_info(&self) -> super::PeerInfo { + pub async fn peer_info(&self) -> super::PeersInfo { let (sender, receiver) = oneshot::channel(); self.sender @@ -88,7 +88,7 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } - pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo { + pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeersInfo { let (sender, receiver) = oneshot::channel(); self.sender diff --git a/examples/chat/src/network/discovery.rs b/examples/chat/src/network/discovery.rs new file mode 100644 index 0000000..9483771 --- /dev/null +++ b/examples/chat/src/network/discovery.rs @@ -0,0 +1,181 @@ +use std::collections::HashSet; + +use eyre::ContextCompat; +use libp2p::PeerId; +use multiaddr::Protocol; +use tracing::{debug, error, info}; + +pub(crate) mod model; +use super::EventLoop; + +impl EventLoop { + // Handles rendezvous discoveries for all rendezvous peers. + // If rendezvous peer is not connected, it will be dialed which will trigger the registration on during identify exchange. + pub(crate) async fn handle_rendezvous_discoveries(&mut self) { + for peer_id in self + .network_state + .get_rendezvous_peer_ids() + .collect::>() + { + let peer_info = match self.network_state.get_peer_info(&peer_id) { + Some(info) => info, + None => { + error!(%peer_id, "Failed to lookup peer info"); + continue; + } + }; + + if !self.swarm.is_connected(&peer_id) { + for addr in peer_info.addrs().cloned() { + if let Err(err) = self.swarm.dial(addr) { + error!(%err, "Failed to dial relay peer"); + } + } + } else { + if let Err(err) = self.perform_rendezvous_discovery(&peer_id) { + error!(%err, "Failed to handle rendezvous discover"); + } + } + } + } + + // Performs rendezvous discovery against the remote rendezvous peer if it's time to do so. + // This function expectes that the relay peer is already connected. + pub(crate) fn perform_rendezvous_discovery( + &mut self, + rendezvous_peer: &PeerId, + ) -> eyre::Result<()> { + let peer_info = self + .network_state + .get_peer_info(rendezvous_peer) + .wrap_err("Failed to get peer info {}")?; + + if peer_info.is_rendezvous_discovery_time() { + self.swarm.behaviour_mut().rendezvous.discover( + Some(self.rendezvous_namespace.clone()), + peer_info.rendezvous_cookie().cloned(), + None, + *rendezvous_peer, + ); + } + + Ok(()) + } + + // Broadcasts rendezvous registrations to all rendezvous peers if there are pending address changes. + // If rendezvous peer is not connected, it will be dialed which will trigger the registration on during identify exchange. + pub(crate) fn broadcast_rendezvous_registrations(&mut self) -> eyre::Result<()> { + if !self.network_state.pending_addr_changes() { + return Ok(()); + } + + for peer_id in self + .network_state + .get_rendezvous_peer_ids() + .collect::>() + { + let peer_info = match self.network_state.get_peer_info(&peer_id) { + Some(info) => info, + None => { + error!(%peer_id, "Failed to lookup peer info"); + continue; + } + }; + + if !self.swarm.is_connected(&peer_id) { + for addr in peer_info.addrs().cloned() { + if let Err(err) = self.swarm.dial(addr) { + error!(%err, "Failed to dial relay peer"); + } + } + } else { + if let Err(err) = self.update_rendezvous_registration(&peer_id) { + error!(%err, "Failed to handle rendezvous discover"); + } + } + } + + self.network_state.clear_pending_addr_changes(); + + Ok(()) + } + + // Updates rendezvous registration on the remote rendezvous peer. + // If there are no external addresses for the node, the registration is considered successful. + // This function expectes that the relay peer is already connected. + pub(crate) fn update_rendezvous_registration( + &mut self, + rendezvous_peer: &PeerId, + ) -> eyre::Result<()> { + if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( + self.rendezvous_namespace.clone(), + *rendezvous_peer, + None, + ) { + match err { + libp2p::rendezvous::client::RegisterError::NoExternalAddresses => {} + err => eyre::bail!(err), + } + } + + debug!( + %rendezvous_peer, rendezvous_namespace=%(self.rendezvous_namespace), + "Sent register request to rendezvous node" + ); + Ok(()) + } + + // Creates relay reservation if node doesn't have a relayed address on the relay peer. + // This function expectes that the relay peer is already connected. + pub(crate) fn create_relay_reservation(&mut self, relay_peer: &PeerId) -> eyre::Result<()> { + let peer_info = self + .network_state + .get_peer_info(relay_peer) + .wrap_err("Failed to get peer info")?; + + let external_addrs = self + .swarm + .external_addresses() + .filter(|addr| addr.iter().any(|p| matches!(p, Protocol::P2pCircuit))) + .map(|addr| addr.clone()) + .collect::>(); + + let preferred_addr = peer_info + .get_preferred_addr() + .wrap_err("Failed to get preferred addr for relay peer")?; + + let relayed_addr = match preferred_addr + .clone() + .with(multiaddr::Protocol::P2pCircuit) + .with_p2p(self.swarm.local_peer_id().clone()) + { + Ok(addr) => addr, + Err(err) => { + eyre::bail!("Failed to construct relayed addr for relay peer: {:?}", err) + } + }; + let is_relay_reservation_required = !(matches!( + peer_info.relay_reservation_status(), + Some(model::RelayReservationStatus::Requested) + ) || external_addrs.contains(&relayed_addr)); + + debug!( + ?peer_info, + ?external_addrs, + %is_relay_reservation_required, + "Checking if relay reservation is required" + ); + + if !is_relay_reservation_required { + return Ok(()); + } + + self.swarm.listen_on(relayed_addr)?; + self.network_state.update_relay_reservation_status( + &relay_peer, + model::RelayReservationStatus::Requested, + )?; + + Ok(()) + } +} diff --git a/examples/chat/src/network/discovery/model.rs b/examples/chat/src/network/discovery/model.rs new file mode 100644 index 0000000..b9f245b --- /dev/null +++ b/examples/chat/src/network/discovery/model.rs @@ -0,0 +1,229 @@ +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::time::Duration; + +use libp2p::{rendezvous, PeerId, StreamProtocol}; +use multiaddr::Multiaddr; +use tokio::time; + +// The rendezvous protocol name is not public in libp2p, so we have to define it here. +// source: https://github.com/libp2p/rust-libp2p/blob/a8888a7978f08ec9b8762207bf166193bf312b94/protocols/rendezvous/src/lib.rs#L50C12-L50C92 +const RENDEZVOUS_PROTOCOL_NAME: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/rendezvous/1.0.0"); + +#[derive(Debug)] +pub(crate) struct DiscoveryState { + peers: BTreeMap, + relay_index: BTreeSet, + rendezvous_index: BTreeSet, + pending_addr_changes: bool, +} + +impl DiscoveryState { + pub(crate) fn new() -> Self { + DiscoveryState { + peers: Default::default(), + relay_index: Default::default(), + rendezvous_index: Default::default(), + pending_addr_changes: false, + } + } + + pub(crate) fn add_peer_addr(&mut self, peer_id: PeerId, addr: &Multiaddr) { + self.peers + .entry(peer_id) + .or_insert_with(|| PeerInfo { + addrs: Default::default(), + relay: None, + rendezvous: None, + }) + .addrs + .insert(addr.clone()); + } + + pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); + self.relay_index.remove(peer_id); + self.rendezvous_index.remove(peer_id); + } + + pub(crate) fn is_peer_of_interest(&self, peer_id: &PeerId) -> bool { + self.relay_index.contains(peer_id) || self.rendezvous_index.contains(peer_id) + } + + pub(crate) fn update_peer_protocols( + &mut self, + peer_id: &PeerId, + protocols: Vec, + ) -> eyre::Result<()> { + protocols.iter().for_each(|protocol| { + if protocol == &libp2p::relay::HOP_PROTOCOL_NAME { + self.relay_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().relay = Some(PeerRelayInfo { + reservation_status: Default::default(), + }); + } + if protocol == &RENDEZVOUS_PROTOCOL_NAME { + self.rendezvous_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().rendezvous = Some(PeerRendezvousInfo { + cookie: None, + last_discovery_at: None, + }); + } + }); + Ok(()) + } + + pub(crate) fn update_rendezvous_cookie( + &mut self, + rendezvous_peer: &PeerId, + cookie: rendezvous::Cookie, + ) -> eyre::Result<()> { + self.peers + .entry(*rendezvous_peer) + .and_modify(|info| info.update_rendezvous_cookie(cookie)); + Ok(()) + } + + pub(crate) fn update_relay_reservation_status( + &mut self, + relay_peer: &PeerId, + status: RelayReservationStatus, + ) -> eyre::Result<()> { + self.peers + .entry(*relay_peer) + .and_modify(|info| info.update_relay_reservation_status(status)); + Ok(()) + } + + pub(crate) fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + pub(crate) fn get_rendezvous_peer_ids(&self) -> impl Iterator + '_ { + self.rendezvous_index.iter().cloned() + } + + pub(crate) fn is_peer_relay(&self, peer_id: &PeerId) -> bool { + self.relay_index.contains(peer_id) + } + + pub(crate) fn is_peer_rendezvous(&self, peer_id: &PeerId) -> bool { + self.rendezvous_index.contains(peer_id) + } + + pub(crate) fn pending_addr_changes(&self) -> bool { + self.pending_addr_changes + } + + pub(crate) fn set_pending_addr_changes(&mut self) { + self.pending_addr_changes = true; + } + + pub(crate) fn clear_pending_addr_changes(&mut self) { + self.pending_addr_changes = true; + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerInfo { + addrs: HashSet, + relay: Option, + rendezvous: Option, +} + +impl PeerInfo { + pub(crate) fn addrs(&self) -> impl Iterator { + self.addrs.iter() + } + + pub(crate) fn get_preferred_addr(&self) -> Option { + let udp_addrs: Vec<&Multiaddr> = self + .addrs + .iter() + .filter(|addr| { + addr.iter() + .any(|p| matches!(p, multiaddr::Protocol::Udp(_))) + }) + .collect(); + + match udp_addrs.len() { + 0 => self.addrs.iter().next().cloned(), + _ => Some(udp_addrs[0].clone()), + } + } + + pub(crate) fn rendezvous_cookie(&self) -> Option<&rendezvous::Cookie> { + self.rendezvous.as_ref().and_then(|info| info.cookie()) + } + + pub(crate) fn update_rendezvous_cookie(&mut self, cookie: rendezvous::Cookie) { + if let Some(ref mut rendezvous_info) = self.rendezvous { + rendezvous_info.update_cookie(cookie); + } + } + + pub(crate) fn is_rendezvous_discovery_time(&self) -> bool { + self.rendezvous + .as_ref() + .map_or(false, |info| info.should_discover()) + } + + pub(crate) fn relay_reservation_status(&self) -> Option { + self.relay + .as_ref() + .and_then(|info| Some(info.reservation_status())) + } + + pub(crate) fn update_relay_reservation_status(&mut self, status: RelayReservationStatus) { + if let Some(ref mut relay_info) = self.relay { + relay_info.update_reservation_status(status); + } + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerRelayInfo { + reservation_status: RelayReservationStatus, +} + +impl PeerRelayInfo { + pub(crate) fn reservation_status(&self) -> RelayReservationStatus { + self.reservation_status + } + + pub(crate) fn update_reservation_status(&mut self, status: RelayReservationStatus) { + self.reservation_status = status; + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub(crate) enum RelayReservationStatus { + #[default] + Discovered, + Requested, + Accepted, +} + +#[derive(Clone, Debug)] +pub(crate) struct PeerRendezvousInfo { + cookie: Option, + last_discovery_at: Option, +} + +impl PeerRendezvousInfo { + pub(crate) fn cookie(&self) -> Option<&rendezvous::Cookie> { + self.cookie.as_ref() + } + + pub(crate) fn update_cookie(&mut self, cookie: rendezvous::Cookie) { + self.cookie = Some(cookie); + self.last_discovery_at = Some(time::Instant::now()); + } + + pub(crate) fn should_discover(&self) -> bool { + match self.last_discovery_at { + Some(instant) => instant.elapsed() > Duration::from_secs(60), + None => true, + } + } +} diff --git a/examples/chat/src/network/events.rs b/examples/chat/src/network/events.rs index 92eb35a..ec7b462 100644 --- a/examples/chat/src/network/events.rs +++ b/examples/chat/src/network/events.rs @@ -5,6 +5,7 @@ use super::*; mod dcutr; mod gossipsub; mod identify; +mod kad; mod mdns; mod ping; mod relay; @@ -18,15 +19,16 @@ impl EventLoop { pub(super) async fn handle_swarm_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(event) => match event { - BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Gossipsub(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Kad(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Mdns(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Relay(event) => events::EventHandler::handle(self, event).await, BehaviourEvent::Rendezvous(event) => { events::EventHandler::handle(self, event).await } - BehaviourEvent::Relay(event) => events::EventHandler::handle(self, event).await, - BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, - BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, }, SwarmEvent::NewListenAddr { listener_id, @@ -61,6 +63,9 @@ impl EventLoop { debug!(%peer_id, ?endpoint, "Connection established"); match endpoint { libp2p::core::ConnectedPoint::Dialer { .. } => { + self.network_state + .add_peer_addr(peer_id, endpoint.get_remote_address()); + if let Some(sender) = self.pending_dial.remove(&peer_id) { let _ = sender.send(Ok(Some(()))); } @@ -79,6 +84,11 @@ impl EventLoop { "Connection closed: {} {:?} {:?} {} {:?}", peer_id, connection_id, endpoint, num_established, cause ); + if !self.swarm.is_connected(&peer_id) + && !self.network_state.is_peer_of_interest(&peer_id) + { + self.network_state.remove_peer(&peer_id); + } } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { debug!(%error, ?peer_id, "Outgoing connection error"); @@ -103,15 +113,25 @@ impl EventLoop { } => { debug!("Listener closed: {:?} {:?}", addresses, reason.err()) } - SwarmEvent::ListenerError { error, .. } => debug!(%error, "Listener error"), + SwarmEvent::ListenerError { error, .. } => info!(%error, "Listener error"), SwarmEvent::NewExternalAddrCandidate { address } => { debug!("New external address candidate: {}", address) } SwarmEvent::ExternalAddrConfirmed { address } => { debug!("External address confirmed: {}", address); + self.network_state.set_pending_addr_changes(); + + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; } SwarmEvent::ExternalAddrExpired { address } => { - debug!("External address expired: {}", address) + debug!("External address expired: {}", address); + self.network_state.set_pending_addr_changes(); + + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; } SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { debug!("New external address of peer: {} {}", peer_id, address) @@ -119,39 +139,4 @@ impl EventLoop { _ => {} } } - - pub(super) async fn handle_rendezvous_discover(&mut self) { - for (peer_id, entry) in self.rendezvous.iter() { - if entry.identify_state.is_exchanged() { - self.swarm.behaviour_mut().rendezvous.discover( - Some(self.rendezvous_namespace.clone()), - entry.cookie.clone(), - None, - *peer_id, - ); - } - } - } - - pub(super) async fn handle_rendezvous_dial(&mut self) { - for (peer_id, entry) in self.rendezvous.iter() { - if self.swarm.is_connected(peer_id) { - continue; - }; - if let Err(err) = self.swarm.dial(entry.address.clone()) { - error!("Failed to dial rendezvous peer: {:?}", err); - }; - } - } - - pub(super) async fn handle_relays_dial(&mut self) { - for (peer_id, entry) in self.relays.iter() { - if self.swarm.is_connected(peer_id) { - continue; - }; - if let Err(err) = self.swarm.dial(entry.address.clone()) { - error!("Failed to dial relay peer: {:?}", err); - }; - } - } } diff --git a/examples/chat/src/network/events/identify.rs b/examples/chat/src/network/events/identify.rs index d67a7aa..6dcc2ef 100644 --- a/examples/chat/src/network/events/identify.rs +++ b/examples/chat/src/network/events/identify.rs @@ -2,53 +2,39 @@ use libp2p::identify; use owo_colors::OwoColorize; use tracing::{debug, error}; -use super::{EventHandler, EventLoop, RelayReservationState}; +use super::{EventHandler, EventLoop}; impl EventHandler for EventLoop { async fn handle(&mut self, event: identify::Event) { debug!("{}: {:?}", "identify".yellow(), event); match event { - identify::Event::Received { peer_id, .. } => { - if let Some(entry) = self.relays.get_mut(&peer_id) { - entry.identify_state.received = true; - - if entry.identify_state.is_exchanged() - && matches!(entry.reservation_state, RelayReservationState::Unknown) - { - if let Err(err) = self - .swarm - .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) - { - error!("Failed to listen on relay address: {:?}", err); - }; - entry.reservation_state = RelayReservationState::Requested; - } + identify::Event::Received { + peer_id, + info: identify::Info { protocols, .. }, + } => { + if let Err(err) = self + .network_state + .update_peer_protocols(&peer_id, protocols) + { + error!(%err, "Failed to update peer protocols"); + return; } - if let Some(entry) = self.rendezvous.get_mut(&peer_id) { - entry.identify_state.received = true; + if self.network_state.is_peer_relay(&peer_id) { + if let Err(err) = self.create_relay_reservation(&peer_id) { + error!(%err, "Failed to handle relay reservation"); + }; } - } - identify::Event::Sent { peer_id } => { - if let Some(entry) = self.relays.get_mut(&peer_id) { - entry.identify_state.sent = true; - if entry.identify_state.is_exchanged() - && matches!(entry.reservation_state, RelayReservationState::Unknown) - { - if let Err(err) = self - .swarm - .listen_on(entry.address.clone().with(multiaddr::Protocol::P2pCircuit)) - { - error!("Failed to listen on relay address: {:?}", err); - }; - entry.reservation_state = RelayReservationState::Requested; - } - } + if self.network_state.is_peer_rendezvous(&peer_id) { + if let Err(err) = self.perform_rendezvous_discovery(&peer_id) { + error!(%err, "Failed to perform rendezvous discovery"); + }; - if let Some(entry) = self.rendezvous.get_mut(&peer_id) { - entry.identify_state.sent = true; + if let Err(err) = self.update_rendezvous_registration(&peer_id) { + error!(%err, "Failed to update registration discovery"); + }; } } _ => {} diff --git a/examples/chat/src/network/events/kad.rs b/examples/chat/src/network/events/kad.rs new file mode 100644 index 0000000..101faa8 --- /dev/null +++ b/examples/chat/src/network/events/kad.rs @@ -0,0 +1,11 @@ +use libp2p::kad; +use owo_colors::OwoColorize; +use tracing::debug; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: kad::Event) { + debug!("{}: {:?}", "kad".yellow(), event); + } +} diff --git a/examples/chat/src/network/events/relay.rs b/examples/chat/src/network/events/relay.rs index ccd31ba..25b1a28 100644 --- a/examples/chat/src/network/events/relay.rs +++ b/examples/chat/src/network/events/relay.rs @@ -2,7 +2,7 @@ use libp2p::relay; use owo_colors::OwoColorize; use tracing::{debug, error}; -use super::{EventHandler, EventLoop, RelayReservationState}; +use super::{discovery, EventHandler, EventLoop}; impl EventHandler for EventLoop { async fn handle(&mut self, event: relay::client::Event) { @@ -10,24 +10,12 @@ impl EventHandler for EventLoop { match event { relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { - if let Some(entry) = self.relays.get_mut(&relay_peer_id) { - entry.reservation_state = RelayReservationState::Acquired; - - for (peer_id, entry) in self.rendezvous.iter() { - if entry.identify_state.is_exchanged() { - if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( - self.rendezvous_namespace.clone(), - *peer_id, - None, - ) { - error!(%err, "Failed to register at rendezvous"); - } - debug!( - "Registered at rendezvous node {} on the namespace {}", - *peer_id, self.rendezvous_namespace - ); - } - } + if let Err(err) = self.network_state.update_relay_reservation_status( + &relay_peer_id, + discovery::model::RelayReservationStatus::Accepted, + ) { + error!(%err, "Failed to update peer relay reservation status"); + return; } } _ => {} diff --git a/examples/chat/src/network/events/rendezvous.rs b/examples/chat/src/network/events/rendezvous.rs index 32cf0f7..1ab04f9 100644 --- a/examples/chat/src/network/events/rendezvous.rs +++ b/examples/chat/src/network/events/rendezvous.rs @@ -14,23 +14,27 @@ impl EventHandler for EventLoop { registrations, cookie, } => { - if let Some(entry) = self.rendezvous.get_mut(&rendezvous_node) { - entry.cookie = Some(cookie); - } + if let Err(err) = self + .network_state + .update_rendezvous_cookie(&rendezvous_node, cookie) + { + error!(%err, "Failed to update peer rendezvous cookie"); + return; + }; for registration in registrations { if registration.record.peer_id() == *self.swarm.local_peer_id() { continue; } + let peer = registration.record.peer_id(); + debug!(%peer, "Discovered peer via rendezvous"); if self.swarm.is_connected(®istration.record.peer_id()) { continue; }; for address in registration.record.addresses() { - let peer = registration.record.peer_id(); - debug!(%peer, %address, "Discovered peer via rendezvous"); - + debug!(%peer, %address, "Dialing peer discovered via rendezvous"); if let Err(err) = self.swarm.dial(address.clone()) { error!("Failed to dial peer: {:?}", err); } @@ -40,29 +44,28 @@ impl EventHandler for EventLoop { rendezvous::client::Event::Registered { rendezvous_node, .. } => { - if let Some(entry) = self.rendezvous.get(&rendezvous_node) { - if entry.cookie.is_none() { - self.swarm.behaviour_mut().rendezvous.discover( - Some(self.rendezvous_namespace.clone()), - entry.cookie.clone(), - None, - rendezvous_node, - ); + if let Some(peer_info) = self.network_state.get_peer_info(&rendezvous_node) { + if peer_info.rendezvous_cookie().is_none() { + debug!(%rendezvous_node, "Discovering peers via rendezvous after registration"); + if let Err(err) = self.perform_rendezvous_discovery(&rendezvous_node) { + error!(%err, "Failed to run rendezvous discovery after registration"); + } } } } - rendezvous::client::Event::Expired { peer } => { - let local_peer_id = *self.swarm.local_peer_id(); - if peer == local_peer_id { - if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( - self.rendezvous_namespace.clone(), - peer, - None, - ) { - error!(%err, "Failed to re-register at rendezvous"); - }; - debug!("Re-registered at rendezvous"); - } + rendezvous::client::Event::DiscoverFailed { + rendezvous_node, + namespace, + error, + } => { + error!(?rendezvous_node, ?namespace, error_code=?error, "Rendezvous discovery failed"); + } + rendezvous::client::Event::RegisterFailed { + rendezvous_node, + namespace, + error, + } => { + error!(?rendezvous_node, ?namespace, error_code=?error, "Rendezvous registration failed"); } _ => {} }