From d7c446342a2e4f79462585a6fb28baab9590fc47 Mon Sep 17 00:00:00 2001 From: Filip Bozic <70634661+fbozic@users.noreply.github.com> Date: Fri, 31 May 2024 17:46:21 +0200 Subject: [PATCH] feat: split discovery into state and eventloop methods --- examples/chat/src/network.rs | 32 +- examples/chat/src/network/client.rs | 4 +- examples/chat/src/network/discovery.rs | 387 +++++------------- examples/chat/src/network/discovery/state.rs | 217 ++++++++++ examples/chat/src/network/events.rs | 127 ++++-- examples/chat/src/network/events/identify.rs | 19 +- examples/chat/src/network/events/relay.rs | 17 +- .../chat/src/network/events/rendezvous.rs | 30 +- 8 files changed, 458 insertions(+), 375 deletions(-) create mode 100644 examples/chat/src/network/discovery/state.rs diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs index e03bdb8..e912ad1 100644 --- a/examples/chat/src/network.rs +++ b/examples/chat/src/network.rs @@ -9,7 +9,7 @@ use libp2p::{ use multiaddr::Multiaddr; use tokio::sync::{mpsc, oneshot}; use tokio::time; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; pub mod client; pub mod discovery; @@ -98,8 +98,7 @@ async fn init( ), kad: { let mut kademlia_config = kad::Config::default(); - kademlia_config - .set_protocol_names(std::iter::once(CALIMERO_KAD_PROTO_NAME).collect()); + kademlia_config.set_protocol_names(vec![CALIMERO_KAD_PROTO_NAME]); let mut kademlia = kad::Behaviour::with_config( peer_id, @@ -148,7 +147,7 @@ pub(crate) struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - discovery_state: discovery::DiscoveryState, + discovery: discovery::Discovery, pending_dial: HashMap>>>, } @@ -163,13 +162,21 @@ impl EventLoop { swarm, command_receiver, event_sender, - discovery_state: discovery::DiscoveryState::new(rendezvous_namespace), + // discovery_state: discovery::DiscoveryState::new(rendezvous_namespace, 0.5), + discovery: discovery::Discovery::new(discovery::DiscoveryConfig::new( + discovery::RendezvousConfig::new( + rendezvous_namespace, + Duration::from_secs(90), + 0.5, + ), + )), pending_dial: Default::default(), } } pub(crate) async fn run(mut self) { - let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(90)); + let mut rendezvous_discover_tick = + tokio::time::interval(self.discovery.config.rendezvous.discovery_interval); loop { tokio::select! { @@ -247,7 +254,7 @@ impl EventLoop { let _ = sender.send(Ok(id)); } - Command::PeerInfo { sender } => { + Command::PeersInfo { sender } => { let peers = self .swarm .connected_peers() @@ -257,7 +264,8 @@ impl EventLoop { let count = peers.len(); let discovered_peers = self - .discovery_state + .discovery + .state .get_peers() .map(|(id, peer)| (id.clone(), peer.clone())) .collect::>(); @@ -270,7 +278,7 @@ impl EventLoop { discovered_peers, }); } - Command::MeshPeerCount { topic, sender } => { + Command::MeshPeersCount { topic, sender } => { let peers = self .swarm .behaviour_mut() @@ -309,10 +317,10 @@ pub(crate) enum Command { data: Vec, sender: oneshot::Sender>, }, - PeerInfo { + PeersInfo { sender: oneshot::Sender, }, - MeshPeerCount { + MeshPeersCount { topic: gossipsub::TopicHash, sender: oneshot::Sender, }, @@ -324,7 +332,7 @@ pub(crate) struct PeersInfo { count: usize, peers: Vec, discovered_count: usize, - discovered_peers: Vec<(PeerId, discovery::PeerInfo)>, + discovered_peers: Vec<(PeerId, discovery::state::PeerInfo)>, } #[allow(dead_code)] // Info structs for pretty printing diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs index 26b863e..6b50c0f 100644 --- a/examples/chat/src/network/client.rs +++ b/examples/chat/src/network/client.rs @@ -81,7 +81,7 @@ impl NetworkClient { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::PeerInfo { sender }) + .send(Command::PeersInfo { sender }) .await .expect("Command receiver not to be dropped."); @@ -92,7 +92,7 @@ impl NetworkClient { let (sender, receiver) = oneshot::channel(); self.sender - .send(Command::MeshPeerCount { topic, sender }) + .send(Command::MeshPeersCount { topic, sender }) .await .expect("Command receiver not to be dropped."); diff --git a/examples/chat/src/network/discovery.rs b/examples/chat/src/network/discovery.rs index c693c74..1f5033b 100644 --- a/examples/chat/src/network/discovery.rs +++ b/examples/chat/src/network/discovery.rs @@ -1,23 +1,70 @@ -use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::time::{self, Duration}; +use std::time; use eyre::ContextCompat; -use libp2p::{rendezvous, Multiaddr, PeerId, StreamProtocol}; -use multiaddr::Protocol; +use libp2p::{rendezvous, PeerId}; use tracing::{debug, error}; +pub(crate) mod state; use super::EventLoop; +#[derive(Debug)] +pub struct Discovery { + pub config: DiscoveryConfig, + pub(crate) state: state::DiscoveryState, +} + +impl Discovery { + pub(crate) fn new(config: DiscoveryConfig) -> Self { + Discovery { + state: Default::default(), + config, + } + } +} + +#[derive(Debug)] +pub(crate) struct DiscoveryConfig { + pub rendezvous: RendezvousConfig, +} + +impl DiscoveryConfig { + pub(crate) fn new(rendezvous: RendezvousConfig) -> Self { + DiscoveryConfig { rendezvous } + } +} + +#[derive(Debug)] +pub struct RendezvousConfig { + pub namespace: rendezvous::Namespace, + pub discovery_interval: time::Duration, + pub discovery_rpm: f32, +} + +impl RendezvousConfig { + pub(crate) fn new( + namespace: rendezvous::Namespace, + discovery_interval: time::Duration, + discovery_rpm: f32, + ) -> Self { + RendezvousConfig { + namespace, + discovery_interval, + discovery_rpm, + } + } +} + impl EventLoop { // Handles rendezvous discoveries for all rendezvous peers. // If rendezvous peer is not connected, it will be dialed which will trigger the discovery during identify exchange. pub(crate) async fn handle_rendezvous_discoveries(&mut self) { for peer_id in self - .discovery_state + .discovery + .state .get_rendezvous_peer_ids() .collect::>() { - let peer_info = match self.discovery_state.get_peer_info(&peer_id) { + let peer_info = match self.discovery.state.get_peer_info(&peer_id) { Some(info) => info, None => { error!(%peer_id, "Failed to lookup peer info"); @@ -46,14 +93,33 @@ impl EventLoop { rendezvous_peer: &PeerId, ) -> eyre::Result<()> { let peer_info = self - .discovery_state + .discovery + .state .get_peer_info(rendezvous_peer) .wrap_err("Failed to get peer info {}")?; - if peer_info.is_rendezvous_discovery_time() { + let is_throttled = peer_info.rendezvous().map_or(false, |info| { + info.last_discovery_at().map_or(false, |instant| { + instant.elapsed() + > time::Duration::from_secs_f32( + 60.0 / self.discovery.config.rendezvous.discovery_rpm, + ) + }) + }); + + debug!( + %rendezvous_peer, + ?is_throttled, + "Checking if rendezvous discovery is throttled" + ); + + if !is_throttled { self.swarm.behaviour_mut().rendezvous.discover( - Some(self.discovery_state.rendezvous_namespace.clone()), - peer_info.rendezvous_cookie().cloned(), + Some(self.discovery.config.rendezvous.namespace.clone()), + peer_info + .rendezvous() + .and_then(|info| info.cookie()) + .cloned(), None, *rendezvous_peer, ); @@ -65,16 +131,17 @@ impl EventLoop { // Broadcasts rendezvous registrations to all rendezvous peers if there are pending address changes. // If rendezvous peer is not connected, it will be dialed which will trigger the registration during identify exchange. pub(crate) fn broadcast_rendezvous_registrations(&mut self) -> eyre::Result<()> { - if !self.discovery_state.pending_addr_changes() { + if !self.discovery.state.pending_addr_changes() { return Ok(()); } for peer_id in self - .discovery_state + .discovery + .state .get_rendezvous_peer_ids() .collect::>() { - let peer_info = match self.discovery_state.get_peer_info(&peer_id) { + let peer_info = match self.discovery.state.get_peer_info(&peer_id) { Some(info) => info, None => { error!(%peer_id, "Failed to lookup peer info"); @@ -95,7 +162,7 @@ impl EventLoop { } } - self.discovery_state.clear_pending_addr_changes(); + self.discovery.state.clear_pending_addr_changes(); Ok(()) } @@ -103,13 +170,10 @@ impl EventLoop { // Updates rendezvous registration on the remote rendezvous peer. // If there are no external addresses for the node, the registration is considered successful. // This function expectes that the relay peer is already connected. - pub(crate) fn update_rendezvous_registration( - &mut self, - rendezvous_peer: &PeerId, - ) -> eyre::Result<()> { + pub(crate) fn update_rendezvous_registration(&mut self, peer_id: &PeerId) -> eyre::Result<()> { if let Err(err) = self.swarm.behaviour_mut().rendezvous.register( - self.discovery_state.rendezvous_namespace.clone(), - *rendezvous_peer, + self.discovery.config.rendezvous.namespace.clone(), + *peer_id, None, ) { match err { @@ -119,25 +183,38 @@ impl EventLoop { } debug!( - %rendezvous_peer, rendezvous_namespace=%(self.discovery_state.rendezvous_namespace), + %peer_id, rendezvous_namespace=%(self.discovery.config.rendezvous.namespace), "Sent register request to rendezvous node" ); Ok(()) } - // Creates relay reservation if node doesn't have a relayed address on the relay peer. + // Creates relay reservation if node didn't already request addres relayed address on the relay peer. // This function expectes that the relay peer is already connected. - pub(crate) fn create_relay_reservation(&mut self, relay_peer: &PeerId) -> eyre::Result<()> { + pub(crate) fn create_relay_reservation(&mut self, peer_id: &PeerId) -> eyre::Result<()> { let peer_info = self - .discovery_state - .get_peer_info(relay_peer) + .discovery + .state + .get_peer_info(peer_id) .wrap_err("Failed to get peer info")?; - let external_addrs = self - .swarm - .external_addresses() - .filter(|addr| addr.iter().any(|p| matches!(p, Protocol::P2pCircuit))) - .collect::>(); + let is_relay_reservation_required = match peer_info.relay() { + Some(info) => match info.reservation_status() { + state::RelayReservationStatus::Discovered => true, + state::RelayReservationStatus::Expired => true, + _ => false, + }, + None => true, + }; + debug!( + ?peer_info, + %is_relay_reservation_required, + "Checking if relay reservation is required" + ); + + if !is_relay_reservation_required { + return Ok(()); + } let preferred_addr = peer_info .get_preferred_addr() @@ -153,255 +230,11 @@ impl EventLoop { eyre::bail!("Failed to construct relayed addr for relay peer: {:?}", err) } }; - let is_relay_reservation_required = !(matches!( - peer_info.relay_reservation_status(), - Some(RelayReservationStatus::Requested) - ) || external_addrs.contains(&relayed_addr)); - - debug!( - ?peer_info, - ?external_addrs, - %is_relay_reservation_required, - "Checking if relay reservation is required" - ); - - if !is_relay_reservation_required { - return Ok(()); - } - self.swarm.listen_on(relayed_addr)?; - self.discovery_state - .update_relay_reservation_status(&relay_peer, RelayReservationStatus::Requested)?; + self.discovery + .state + .update_relay_reservation_status(&peer_id, state::RelayReservationStatus::Requested); Ok(()) } } - -// The rendezvous protocol name is not public in libp2p, so we have to define it here. -// source: https://github.com/libp2p/rust-libp2p/blob/a8888a7978f08ec9b8762207bf166193bf312b94/protocols/rendezvous/src/lib.rs#L50C12-L50C92 -const RENDEZVOUS_PROTOCOL_NAME: libp2p::StreamProtocol = - libp2p::StreamProtocol::new("/rendezvous/1.0.0"); - -#[derive(Debug)] -pub(crate) struct DiscoveryState { - peers: BTreeMap, - relay_index: BTreeSet, - rendezvous_index: BTreeSet, - rendezvous_namespace: libp2p::rendezvous::Namespace, - pending_addr_changes: bool, -} - -impl DiscoveryState { - pub(crate) fn new(rendezvous_namespace: libp2p::rendezvous::Namespace) -> Self { - DiscoveryState { - peers: Default::default(), - relay_index: Default::default(), - rendezvous_index: Default::default(), - rendezvous_namespace, - pending_addr_changes: false, - } - } - - pub(crate) fn add_peer_addr(&mut self, peer_id: PeerId, addr: &Multiaddr) { - self.peers - .entry(peer_id) - .or_insert_with(|| PeerInfo { - addrs: Default::default(), - relay: None, - rendezvous: None, - }) - .addrs - .insert(addr.clone()); - } - - pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) { - self.peers.remove(peer_id); - self.relay_index.remove(peer_id); - self.rendezvous_index.remove(peer_id); - } - - pub(crate) fn is_peer_of_interest(&self, peer_id: &PeerId) -> bool { - self.relay_index.contains(peer_id) || self.rendezvous_index.contains(peer_id) - } - - pub(crate) fn update_peer_protocols( - &mut self, - peer_id: &PeerId, - protocols: Vec, - ) -> eyre::Result<()> { - protocols.iter().for_each(|protocol| { - if protocol == &libp2p::relay::HOP_PROTOCOL_NAME { - self.relay_index.insert(*peer_id); - self.peers.entry(*peer_id).or_default().relay = Some(PeerRelayInfo { - reservation_status: Default::default(), - }); - } - if protocol == &RENDEZVOUS_PROTOCOL_NAME { - self.rendezvous_index.insert(*peer_id); - self.peers.entry(*peer_id).or_default().rendezvous = Some(PeerRendezvousInfo { - cookie: None, - last_discovery_at: None, - }); - } - }); - Ok(()) - } - - pub(crate) fn update_rendezvous_cookie( - &mut self, - rendezvous_peer: &PeerId, - cookie: rendezvous::Cookie, - ) -> eyre::Result<()> { - self.peers - .entry(*rendezvous_peer) - .and_modify(|info| info.update_rendezvous_cookie(cookie)); - Ok(()) - } - - pub(crate) fn update_relay_reservation_status( - &mut self, - relay_peer: &PeerId, - status: RelayReservationStatus, - ) -> eyre::Result<()> { - self.peers - .entry(*relay_peer) - .and_modify(|info| info.update_relay_reservation_status(status)); - Ok(()) - } - - pub(crate) fn get_peers(&self) -> impl Iterator { - self.peers.iter() - } - - pub(crate) fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.peers.get(peer_id) - } - - pub(crate) fn get_rendezvous_peer_ids(&self) -> impl Iterator + '_ { - self.rendezvous_index.iter().cloned() - } - - pub(crate) fn is_peer_relay(&self, peer_id: &PeerId) -> bool { - self.relay_index.contains(peer_id) - } - - pub(crate) fn is_peer_rendezvous(&self, peer_id: &PeerId) -> bool { - self.rendezvous_index.contains(peer_id) - } - - pub(crate) fn pending_addr_changes(&self) -> bool { - self.pending_addr_changes - } - - pub(crate) fn set_pending_addr_changes(&mut self) { - self.pending_addr_changes = true; - } - - pub(crate) fn clear_pending_addr_changes(&mut self) { - self.pending_addr_changes = true; - } -} - -#[derive(Clone, Debug, Default)] -pub(crate) struct PeerInfo { - addrs: HashSet, - relay: Option, - rendezvous: Option, -} - -impl PeerInfo { - pub(crate) fn addrs(&self) -> impl Iterator { - self.addrs.iter() - } - - pub(crate) fn get_preferred_addr(&self) -> Option { - let udp_addrs: Vec<&Multiaddr> = self - .addrs - .iter() - .filter(|addr| { - addr.iter() - .any(|p| matches!(p, multiaddr::Protocol::Udp(_))) - }) - .collect(); - - match udp_addrs.len() { - 0 => self.addrs.iter().next().cloned(), - _ => Some(udp_addrs[0].clone()), - } - } - - pub(crate) fn rendezvous_cookie(&self) -> Option<&rendezvous::Cookie> { - self.rendezvous.as_ref().and_then(|info| info.cookie()) - } - - pub(crate) fn update_rendezvous_cookie(&mut self, cookie: rendezvous::Cookie) { - if let Some(ref mut rendezvous_info) = self.rendezvous { - rendezvous_info.update_cookie(cookie); - } - } - - pub(crate) fn is_rendezvous_discovery_time(&self) -> bool { - self.rendezvous - .as_ref() - .map_or(false, |info| info.should_discover()) - } - - pub(crate) fn relay_reservation_status(&self) -> Option { - self.relay - .as_ref() - .and_then(|info| Some(info.reservation_status())) - } - - pub(crate) fn update_relay_reservation_status(&mut self, status: RelayReservationStatus) { - if let Some(ref mut relay_info) = self.relay { - relay_info.update_reservation_status(status); - } - } -} - -#[derive(Clone, Debug, Default)] -pub(crate) struct PeerRelayInfo { - reservation_status: RelayReservationStatus, -} - -impl PeerRelayInfo { - pub(crate) fn reservation_status(&self) -> RelayReservationStatus { - self.reservation_status - } - - pub(crate) fn update_reservation_status(&mut self, status: RelayReservationStatus) { - self.reservation_status = status; - } -} - -#[derive(Clone, Copy, Debug, Default)] -pub(crate) enum RelayReservationStatus { - #[default] - Discovered, - Requested, - Accepted, -} - -#[derive(Clone, Debug)] -pub(crate) struct PeerRendezvousInfo { - cookie: Option, - last_discovery_at: Option, -} - -impl PeerRendezvousInfo { - pub(crate) fn cookie(&self) -> Option<&rendezvous::Cookie> { - self.cookie.as_ref() - } - - pub(crate) fn update_cookie(&mut self, cookie: rendezvous::Cookie) { - self.cookie = Some(cookie); - self.last_discovery_at = Some(time::Instant::now()); - } - - pub(crate) fn should_discover(&self) -> bool { - match self.last_discovery_at { - Some(instant) => instant.elapsed() > Duration::from_secs(60), - None => true, - } - } -} diff --git a/examples/chat/src/network/discovery/state.rs b/examples/chat/src/network/discovery/state.rs new file mode 100644 index 0000000..d4e6ee3 --- /dev/null +++ b/examples/chat/src/network/discovery/state.rs @@ -0,0 +1,217 @@ +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::time; + +use libp2p::{rendezvous, Multiaddr, PeerId, StreamProtocol}; + +// The rendezvous protocol name is not public in libp2p, so we have to define it here. +// source: https://github.com/libp2p/rust-libp2p/blob/a8888a7978f08ec9b8762207bf166193bf312b94/protocols/rendezvous/src/lib.rs#L50C12-L50C92 +const RENDEZVOUS_PROTOCOL_NAME: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/rendezvous/1.0.0"); + +#[derive(Debug)] +pub(crate) struct DiscoveryState { + peers: BTreeMap, + relay_index: BTreeSet, + rendezvous_index: BTreeSet, + pending_addr_changes: bool, +} + +impl Default for DiscoveryState { + fn default() -> Self { + DiscoveryState { + peers: Default::default(), + relay_index: Default::default(), + rendezvous_index: Default::default(), + pending_addr_changes: false, + } + } +} + +impl DiscoveryState { + pub(crate) fn get_peers(&self) -> impl Iterator { + self.peers.iter() + } + + pub(crate) fn add_peer_addr(&mut self, peer_id: PeerId, addr: &Multiaddr) { + self.peers + .entry(peer_id) + .or_default() + .addrs + .insert(addr.clone()); + } + + pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); + self.relay_index.remove(peer_id); + self.rendezvous_index.remove(peer_id); + } + + pub(crate) fn update_peer_protocols(&mut self, peer_id: &PeerId, protocols: &[StreamProtocol]) { + protocols.iter().for_each(|protocol| { + if protocol == &libp2p::relay::HOP_PROTOCOL_NAME { + self.relay_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().relay = Some(PeerRelayInfo { + reservation_status: Default::default(), + }); + } + if protocol == &RENDEZVOUS_PROTOCOL_NAME { + self.rendezvous_index.insert(*peer_id); + self.peers.entry(*peer_id).or_default().rendezvous = Some(PeerRendezvousInfo { + cookie: None, + last_discovery_at: None, + }); + } + }); + } + + pub(crate) fn update_rendezvous_cookie( + &mut self, + rendezvous_peer: &PeerId, + cookie: rendezvous::Cookie, + ) { + self.peers + .entry(*rendezvous_peer) + .and_modify(|info| info.update_rendezvous_cookie(cookie.clone())) + .or_default() + .rendezvous = Some(PeerRendezvousInfo { + cookie: Some(cookie.clone()), + last_discovery_at: None, + }); + } + + pub(crate) fn update_relay_reservation_status( + &mut self, + relay_peer: &PeerId, + status: RelayReservationStatus, + ) { + self.peers + .entry(*relay_peer) + .and_modify(|info| info.update_relay_reservation_status(status)) + .or_default() + .relay = Some(PeerRelayInfo { + reservation_status: status, + }); + } + + pub(crate) fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + pub(crate) fn get_rendezvous_peer_ids(&self) -> impl Iterator + '_ { + self.rendezvous_index.iter().cloned() + } + + pub(crate) fn is_peer_relay(&self, peer_id: &PeerId) -> bool { + self.relay_index.contains(peer_id) + } + + pub(crate) fn is_peer_rendezvous(&self, peer_id: &PeerId) -> bool { + self.rendezvous_index.contains(peer_id) + } + + pub(crate) fn pending_addr_changes(&self) -> bool { + self.pending_addr_changes + } + + pub(crate) fn set_pending_addr_changes(&mut self) { + self.pending_addr_changes = true; + } + + pub(crate) fn clear_pending_addr_changes(&mut self) { + self.pending_addr_changes = false; + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerInfo { + addrs: HashSet, + relay: Option, + rendezvous: Option, +} + +impl PeerInfo { + pub(crate) fn addrs(&self) -> impl Iterator { + self.addrs.iter() + } + + pub(crate) fn get_preferred_addr(&self) -> Option<&Multiaddr> { + let udp_addrs: Vec<&Multiaddr> = self + .addrs + .iter() + .filter(|addr| { + addr.iter() + .any(|p| matches!(p, multiaddr::Protocol::Udp(_))) + }) + .collect(); + + match udp_addrs.len() { + 0 => self.addrs.iter().next(), + _ => Some(udp_addrs[0]), + } + } + + pub(crate) fn relay(&self) -> Option<&PeerRelayInfo> { + self.relay.as_ref() + } + + pub(crate) fn rendezvous(&self) -> Option<&PeerRendezvousInfo> { + self.rendezvous.as_ref() + } + + fn update_rendezvous_cookie(&mut self, cookie: rendezvous::Cookie) { + if let Some(ref mut rendezvous_info) = self.rendezvous { + rendezvous_info.update_cookie(cookie); + } + } + + fn update_relay_reservation_status(&mut self, status: RelayReservationStatus) { + if let Some(ref mut relay_info) = self.relay { + relay_info.update_reservation_status(status); + } + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct PeerRelayInfo { + reservation_status: RelayReservationStatus, +} + +impl PeerRelayInfo { + pub(crate) fn reservation_status(&self) -> RelayReservationStatus { + self.reservation_status + } + + fn update_reservation_status(&mut self, status: RelayReservationStatus) { + self.reservation_status = status; + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub(crate) enum RelayReservationStatus { + #[default] + Discovered, + Requested, + Accepted, + Expired, +} + +#[derive(Clone, Debug)] +pub(crate) struct PeerRendezvousInfo { + cookie: Option, + last_discovery_at: Option, +} + +impl PeerRendezvousInfo { + pub(crate) fn cookie(&self) -> Option<&rendezvous::Cookie> { + self.cookie.as_ref() + } + + pub(crate) fn last_discovery_at(&self) -> Option { + self.last_discovery_at + } + + fn update_cookie(&mut self, cookie: rendezvous::Cookie) { + self.cookie = Some(cookie); + self.last_discovery_at = Some(time::Instant::now()); + } +} diff --git a/examples/chat/src/network/events.rs b/examples/chat/src/network/events.rs index c5fd38e..230b62e 100644 --- a/examples/chat/src/network/events.rs +++ b/examples/chat/src/network/events.rs @@ -1,4 +1,4 @@ -use tracing::error; +use tracing::{error, info}; use super::*; @@ -35,26 +35,16 @@ impl EventLoop { address, } => { let local_peer_id = *self.swarm.local_peer_id(); - let address = match address.with_p2p(local_peer_id) { - Ok(address) => address, - Err(address) => { - warn!( - "Failed to sanitize listen address with p2p proto, address: {:?}, p2p proto: {:?}", - address, local_peer_id - ); - address - } - }; if let Err(err) = self .event_sender .send(types::NetworkEvent::ListeningOn { listener_id, - address, + address: address.with(multiaddr::Protocol::P2p(local_peer_id)), }) .await { error!("Failed to send listening on event: {:?}", err); - }; + } } SwarmEvent::IncomingConnection { .. } => {} SwarmEvent::ConnectionEstablished { @@ -63,7 +53,8 @@ impl EventLoop { debug!(%peer_id, ?endpoint, "Connection established"); match endpoint { libp2p::core::ConnectedPoint::Dialer { .. } => { - self.discovery_state + self.discovery + .state .add_peer_addr(peer_id, endpoint.get_remote_address()); if let Some(sender) = self.pending_dial.remove(&peer_id) { @@ -85,62 +76,112 @@ impl EventLoop { peer_id, connection_id, endpoint, num_established, cause ); if !self.swarm.is_connected(&peer_id) - && !self.discovery_state.is_peer_of_interest(&peer_id) + && !self.discovery.state.is_peer_relay(&peer_id) + && !self.discovery.state.is_peer_rendezvous(&peer_id) { - self.discovery_state.remove_peer(&peer_id); + self.discovery.state.remove_peer(&peer_id); } } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - debug!(%error, ?peer_id, "Outgoing connection error"); if let Some(peer_id) = peer_id { if let Some(sender) = self.pending_dial.remove(&peer_id) { let _ = sender.send(Err(eyre::eyre!(error))); } } } - SwarmEvent::IncomingConnectionError { - send_back_addr, - error, - .. - } => { - debug!(%error, %send_back_addr, "Incoming connection error") - } + SwarmEvent::IncomingConnectionError { .. } => {} SwarmEvent::Dialing { peer_id: Some(peer_id), .. - } => trace!("Dialing peer: {}", peer_id), + } => debug!("Dialing peer: {}", peer_id), SwarmEvent::ExpiredListenAddr { address, .. } => { - debug!("Expired listen address: {}", address) + trace!("Expired listen address: {}", address) } SwarmEvent::ListenerClosed { addresses, reason, .. - } => { - debug!("Listener closed: {:?} {:?}", addresses, reason.err()) - } - SwarmEvent::ListenerError { error, .. } => info!(%error, "Listener error"), + } => trace!("Listener closed: {:?} {:?}", addresses, reason.err()), + SwarmEvent::ListenerError { error, .. } => trace!("Listener error: {:?}", error), SwarmEvent::NewExternalAddrCandidate { address } => { - debug!("New external address candidate: {}", address) + trace!("New external address candidate: {}", address) } SwarmEvent::ExternalAddrConfirmed { address } => { - debug!("External address confirmed: {}", address); - self.discovery_state.set_pending_addr_changes(); - - if let Err(err) = self.broadcast_rendezvous_registrations() { - error!(%err, "Failed to handle rendezvous register"); - }; + info!("External address confirmed: {}", address); + if let Ok(relayed_addr) = RelayedMultiaddr::try_from(&address) { + self.discovery.state.update_relay_reservation_status( + &relayed_addr.relay_peer, + discovery::state::RelayReservationStatus::Accepted, + ); + self.discovery.state.set_pending_addr_changes(); + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; + } } SwarmEvent::ExternalAddrExpired { address } => { debug!("External address expired: {}", address); - self.discovery_state.set_pending_addr_changes(); + if let Ok(relayed_addr) = RelayedMultiaddr::try_from(&address) { + self.discovery.state.update_relay_reservation_status( + &relayed_addr.relay_peer_id(), + discovery::state::RelayReservationStatus::Expired, + ); - if let Err(err) = self.broadcast_rendezvous_registrations() { - error!(%err, "Failed to handle rendezvous register"); - }; + self.discovery.state.set_pending_addr_changes(); + if let Err(err) = self.broadcast_rendezvous_registrations() { + error!(%err, "Failed to handle rendezvous register"); + }; + } } SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { - debug!("New external address of peer: {} {}", peer_id, address) + trace!("New external address of peer: {} {}", peer_id, address) } - _ => {} + unhandled => warn!("Unhandled event: {:?}", unhandled), } } } + +#[derive(Debug)] +pub(crate) struct RelayedMultiaddr { + relay_peer: PeerId, +} + +impl TryFrom<&Multiaddr> for RelayedMultiaddr { + type Error = &'static str; + + fn try_from(value: &Multiaddr) -> Result { + let mut peer_ids = Vec::new(); + + let mut iter = value.iter(); + + while let Some(protocol) = iter.next() { + match protocol { + multiaddr::Protocol::P2pCircuit => { + if peer_ids.is_empty() { + return Err("expected at least one p2p proto before P2pCircuit"); + } + let Some(multiaddr::Protocol::P2p(id)) = iter.next() else { + return Err("expected p2p proto after P2pCircuit"); + }; + peer_ids.push(id); + } + multiaddr::Protocol::P2p(id) => { + peer_ids.push(id); + } + _ => {} + } + } + + if peer_ids.len() < 2 { + return Err("expected at least two p2p protos, one for peer and one for relay"); + } + + Ok(Self { + relay_peer: peer_ids.remove(0), + }) + } +} + +impl RelayedMultiaddr { + fn relay_peer_id(&self) -> &PeerId { + &self.relay_peer + } +} diff --git a/examples/chat/src/network/events/identify.rs b/examples/chat/src/network/events/identify.rs index 20156e6..2ee769d 100644 --- a/examples/chat/src/network/events/identify.rs +++ b/examples/chat/src/network/events/identify.rs @@ -9,25 +9,18 @@ impl EventHandler for EventLoop { debug!("{}: {:?}", "identify".yellow(), event); match event { - identify::Event::Received { - peer_id, - info: identify::Info { protocols, .. }, - } => { - if let Err(err) = self - .discovery_state - .update_peer_protocols(&peer_id, protocols) - { - error!(%err, "Failed to update peer protocols"); - return; - } + identify::Event::Received { peer_id, info } => { + self.discovery + .state + .update_peer_protocols(&peer_id, &info.protocols); - if self.discovery_state.is_peer_relay(&peer_id) { + if self.discovery.state.is_peer_relay(&peer_id) { if let Err(err) = self.create_relay_reservation(&peer_id) { error!(%err, "Failed to handle relay reservation"); }; } - if self.discovery_state.is_peer_rendezvous(&peer_id) { + if self.discovery.state.is_peer_rendezvous(&peer_id) { if let Err(err) = self.perform_rendezvous_discovery(&peer_id) { error!(%err, "Failed to perform rendezvous discovery"); }; diff --git a/examples/chat/src/network/events/relay.rs b/examples/chat/src/network/events/relay.rs index 45b4b3a..545443a 100644 --- a/examples/chat/src/network/events/relay.rs +++ b/examples/chat/src/network/events/relay.rs @@ -1,24 +1,11 @@ use libp2p::relay; use owo_colors::OwoColorize; -use tracing::{debug, error}; +use tracing::debug; -use super::{discovery, EventHandler, EventLoop}; +use super::{EventHandler, EventLoop}; impl EventHandler for EventLoop { async fn handle(&mut self, event: relay::client::Event) { debug!("{}: {:?}", "relay".yellow(), event); - - match event { - relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { - if let Err(err) = self.discovery_state.update_relay_reservation_status( - &relay_peer_id, - discovery::RelayReservationStatus::Accepted, - ) { - error!(%err, "Failed to update peer relay reservation status"); - return; - } - } - _ => {} - } } } diff --git a/examples/chat/src/network/events/rendezvous.rs b/examples/chat/src/network/events/rendezvous.rs index c0b8672..ff683e7 100644 --- a/examples/chat/src/network/events/rendezvous.rs +++ b/examples/chat/src/network/events/rendezvous.rs @@ -14,27 +14,27 @@ impl EventHandler for EventLoop { registrations, cookie, } => { - if let Err(err) = self - .discovery_state - .update_rendezvous_cookie(&rendezvous_node, cookie) - { - error!(%err, "Failed to update peer rendezvous cookie"); - return; - }; + self.discovery + .state + .update_rendezvous_cookie(&rendezvous_node, cookie); for registration in registrations { if registration.record.peer_id() == *self.swarm.local_peer_id() { continue; } - let peer = registration.record.peer_id(); - debug!(%peer, "Discovered peer via rendezvous"); - if self.swarm.is_connected(®istration.record.peer_id()) { + let peer_id = registration.record.peer_id(); + if self.swarm.is_connected(&peer_id) { continue; }; + debug!( + %peer_id, + addrs=?(registration.record.addresses()), + "Discovered unconnected peer via rendezvous, attempting to dial it" + ); for address in registration.record.addresses() { - debug!(%peer, %address, "Dialing peer discovered via rendezvous"); + debug!(%peer_id, %address, "Dialing peer discovered via rendezvous"); if let Err(err) = self.swarm.dial(address.clone()) { error!("Failed to dial peer: {:?}", err); } @@ -44,8 +44,12 @@ impl EventHandler for EventLoop { rendezvous::client::Event::Registered { rendezvous_node, .. } => { - if let Some(peer_info) = self.discovery_state.get_peer_info(&rendezvous_node) { - if peer_info.rendezvous_cookie().is_none() { + if let Some(peer_info) = self.discovery.state.get_peer_info(&rendezvous_node) { + if peer_info + .rendezvous() + .and_then(|info| info.cookie()) + .is_none() + { debug!(%rendezvous_node, "Discovering peers via rendezvous after registration"); if let Err(err) = self.perform_rendezvous_discovery(&rendezvous_node) { error!(%err, "Failed to run rendezvous discovery after registration");