From b8e7991e89724f52551b855f162f977ec9ce680c Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Sat, 22 Jun 2024 11:28:12 +0100 Subject: [PATCH] Move network service relay initialization into main event loop --- aquadoggo/src/network/mod.rs | 1 + aquadoggo/src/network/peers/behaviour.rs | 9 - aquadoggo/src/network/relay.rs | 110 ++++++ aquadoggo/src/network/service.rs | 470 +++++++++-------------- aquadoggo/src/network/utils.rs | 60 ++- 5 files changed, 349 insertions(+), 301 deletions(-) create mode 100644 aquadoggo/src/network/relay.rs diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index c2819c74c..400c0a023 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -4,6 +4,7 @@ mod behaviour; mod config; pub mod identity; mod peers; +mod relay; mod service; mod shutdown; mod swarm; diff --git a/aquadoggo/src/network/peers/behaviour.rs b/aquadoggo/src/network/peers/behaviour.rs index be23f60e1..9e4dfa1e3 100644 --- a/aquadoggo/src/network/peers/behaviour.rs +++ b/aquadoggo/src/network/peers/behaviour.rs @@ -120,15 +120,6 @@ impl Behaviour { false } - /// Disable the behaviour, it won't handle any connection events or received messages. - pub fn disable(&mut self) { - self.enabled = false - } - - pub fn enable(&mut self) { - self.enabled = true - } - pub fn send_message(&mut self, peer: Peer, message: PeerMessage) { self.push_event(ToSwarm::NotifyHandler { peer_id: peer.id(), diff --git a/aquadoggo/src/network/relay.rs b/aquadoggo/src/network/relay.rs new file mode 100644 index 000000000..39b1e2879 --- /dev/null +++ b/aquadoggo/src/network/relay.rs @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use libp2p::multiaddr::Protocol; +use libp2p::{rendezvous, Multiaddr, PeerId, Swarm}; + +use crate::network::behaviour::P2pandaBehaviour; +use crate::network::config::NODE_NAMESPACE; + +/// A relay node. +pub struct Relay { + /// PeerId of the relay node. + pub(crate) peer_id: PeerId, + + /// A single Multiaddr which we know the relay node to be accessible at. + pub(crate) addr: Multiaddr, + + /// The namespace we discover peers at on this relay. + pub(crate) namespace: String, + + /// Did we tell the relay it's observed address yet. + pub(crate) told_addr: bool, + + /// Are we currently discovering peers. + pub(crate) discovering: bool, + + /// Are we in the process of registering at this relay. + pub(crate) registering: bool, + + /// Have we successfully registered. + pub(crate) registered: bool, + + /// Was our relay circuit reservation accepted. + pub(crate) reservation_accepted: bool, +} + +impl Relay { + pub fn new(peer_id: PeerId, addr: Multiaddr) -> Self { + Relay { + peer_id, + addr, + namespace: NODE_NAMESPACE.to_string(), + told_addr: false, + discovering: false, + registering: false, + registered: false, + reservation_accepted: false, + } + } + + /// The circuit address we should listen at for this relay. + pub fn circuit_addr(&self) -> Multiaddr { + self.addr + .clone() + .with(Protocol::P2p(self.peer_id)) + .with(Protocol::P2pCircuit) + } + + /// Start listening on the relay circuit address and register on our discovery namespace. + pub fn register(&mut self, swarm: &mut Swarm) -> Result { + if self.registered || self.registering { + return Ok(false); + } + + self.registering = true; + + // Start listening on the circuit relay address. + let circuit_address = self.circuit_addr(); + swarm.listen_on(circuit_address.clone())?; + + // Register in the `NODE_NAMESPACE` using the rendezvous network behaviour. + swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + .unwrap() + .register( + rendezvous::Namespace::from_static(NODE_NAMESPACE), + self.peer_id.clone(), + None, // Default ttl is 7200s + )?; + + Ok(true) + } + + /// Start discovering peers also registered at the same namespace. + pub fn discover(&mut self, swarm: &mut Swarm) -> bool { + if self.reservation_accepted && self.registered && !self.discovering { + self.discovering = true; + + swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + .expect("Relay client behaviour exists") + .discover( + Some( + rendezvous::Namespace::new(NODE_NAMESPACE.to_string()) + .expect("Valid namespace"), + ), + None, + None, + self.peer_id, + ); + + true + } else { + false + } + } +} diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 88b89f6be..7bccafcc6 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -7,7 +7,8 @@ use std::time::Duration; use anyhow::Result; use libp2p::multiaddr::Protocol; -use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::rendezvous::Registration; +use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm}; use log::{debug, info, trace, warn}; @@ -20,11 +21,13 @@ use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; use crate::network::behaviour::{Event, P2pandaBehaviour}; -use crate::network::config::{PeerAddress, NODE_NAMESPACE}; +use crate::network::relay::Relay; +use crate::network::utils::{dial_known_peer, is_known_peer_address}; use crate::network::{identity, peers, swarm, utils, ShutdownHandler}; use crate::NetworkConfiguration; -const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(20); +/// Interval at which we attempt to dial known peers and relays. +const REDIAL_INTERVAL: Duration = Duration::from_secs(20); /// Network service which handles all networking logic for a p2panda node. /// @@ -42,7 +45,7 @@ pub async fn network_service( tx: ServiceSender, tx_ready: ServiceReadySender, ) -> Result<()> { - let mut network_config = context.config.network.clone(); + let network_config = context.config.network.clone(); let key_pair = identity::to_libp2p_key_pair(&context.key_pair); let local_peer_id = key_pair.public().to_peer_id(); @@ -74,91 +77,6 @@ pub async fn network_service( swarm.listen_on(random_port_addr)?; } - // If relay node addresses were provided, then connect to each and perform necessary setup before we - // run the main event loop. - // - // We are not connecting to other relays when in relay mode (is this even supported by libp2p)? - // See related issue: https://github.com/p2panda/aquadoggo/issues/529 - let mut connected_relays = HashMap::new(); - if !network_config.relay_addresses.is_empty() && !network_config.relay_mode { - // First we need to stop the "peers" behaviour. - // - // We do this so that the connections we create during initialization do not trigger - // replication sessions, which could leave the node in a strange state. - swarm.behaviour_mut().peers.disable(); - - for relay_address in network_config.relay_addresses.iter_mut() { - info!("Connecting to relay node {}", relay_address); - - let mut relay_address = match relay_address.quic_multiaddr() { - Ok(relay_address) => relay_address, - Err(e) => { - debug!("Failed to resolve relay multiaddr: {}", e.to_string()); - continue; - } - }; - - // Attempt to connect to the relay node, we give this a 10 second timeout so as not to - // get stuck if one relay is unreachable. - if let Ok(result) = tokio::time::timeout( - RELAY_CONNECT_TIMEOUT, - connect_to_relay(&mut swarm, &mut relay_address), - ) - .await - { - match result { - // If the connection is successful then add this node to our map of connected relays. - Ok((relay_peer_id, relay_address)) => { - connected_relays.insert(relay_peer_id, relay_address); - } - Err(_) => info!("Failed to connect to relay node"), - } - } else { - info!("Relay connection attempt failed: connection timeout") - }; - } - - // Finally we want to dial any relay nodes we connected to _again_, this time in order to - // initiate replication with it, which will happen automatically once the connection succeeds. - // And then begin discovering peers in `NODE_NAMESPACE` on each relay node in order to begin - // replicating with them too. - // - // @TODO: This is a workaround since we don't have a way yet to start replication _not_ at the - // same time the connection gets successfully established. We should fix this and remove the - // second, potentially redundant dial. - // - // See related issue: https://github.com/p2panda/aquadoggo/issues/507 - - // First restart the "peers" behaviour in order to handle the expected messages - swarm.behaviour_mut().peers.enable(); - - for relay_peer_id in connected_relays.keys() { - let opts = DialOpts::peer_id(*relay_peer_id) - .condition(PeerCondition::Always) // There is an existing connection, so we force dial here. - .build(); - match swarm.dial(opts) { - Ok(_) => (), - Err(err) => debug!("Error dialing peer: {:?}", err), - }; - - // Now request to discover other peers in `NODE_NAMESPACE`. - info!("Discovering peers in namespace \"{NODE_NAMESPACE}\"",); - swarm - .behaviour_mut() - .rendezvous_client - .as_mut() - .expect("Relay client exists as we a relay address was provided") - .discover( - Some( - rendezvous::Namespace::new(NODE_NAMESPACE.to_string()) - .expect("Valid namespace"), - ), - None, - None, - *relay_peer_id, - ); - } - } info!("Network service ready!"); // Spawn main event loop handling all p2panda and libp2p network events. @@ -166,7 +84,6 @@ pub async fn network_service( swarm, network_config.to_owned(), local_peer_id, - connected_relays, shutdown, tx, tx_ready, @@ -174,108 +91,6 @@ pub async fn network_service( .await } -/// Connect to a relay node, confirm exchange identity information and wait to listen on our -/// circuit relay address. -pub async fn connect_to_relay( - swarm: &mut Swarm, - relay_address: &mut Multiaddr, -) -> Result<(PeerId, Multiaddr)> { - // Connect to the relay server. Not for the reservation or relayed connection, but to (a) learn - // our local public address and (b) enable a freshly started relay to learn its public address. - swarm.dial(relay_address.clone())?; - - // Wait to get confirmation that we told the relay node its public address and that they told - // us ours. - let mut learned_observed_addr = false; - let mut told_relay_observed_addr = false; - let mut learned_relay_peer_id: Option = None; - - loop { - match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(Event::Identify(identify::Event::Sent { .. })) => { - info!("Told relay its public address."); - told_relay_observed_addr = true; - } - SwarmEvent::Behaviour(Event::Identify(identify::Event::Received { - info: identify::Info { observed_addr, .. }, - peer_id, - })) => { - debug!("Relay told us our public address: {:?}", observed_addr); - - // Add the newly learned address to our external addresses. - swarm.add_external_address(observed_addr); - - // Now that we have a reply from the relay node we can add their peer id to the - // relay address. - relay_address.push(Protocol::P2p(peer_id)); - - // Update values on the config. - learned_relay_peer_id = Some(peer_id); - - // All done, we've learned our external address successfully. - learned_observed_addr = true; - } - event => debug!("{event:?}"), - } - - if learned_observed_addr && told_relay_observed_addr { - break; - } - } - - // We know the relays peer address was learned in the above step so we unwrap it here. - let relay_peer_id = learned_relay_peer_id.expect("Received relay peer id"); - - // Now we have received our external address, and we know the relay has too, listen on our - // relay circuit address. - let circuit_address = relay_address.clone().with(Protocol::P2pCircuit); - swarm.listen_on(circuit_address.clone())?; - - // Register in the `NODE_NAMESPACE` on the rendezvous server. Doing this will mean that we can - // discover other peers also registered to the same rendezvous server and namespace. - swarm - .behaviour_mut() - .rendezvous_client - .as_mut() - .unwrap() - .register( - rendezvous::Namespace::from_static(NODE_NAMESPACE), - relay_peer_id, - None, // Default ttl is 7200s - )?; - - // Wait to get confirmation that our registration on the rendezvous server at namespace - // `NODE_NAMESPACE` was successful and that the relay server has accepted our reservation. - let mut rendezvous_registered = false; - let mut relay_reservation_accepted = false; - - loop { - match swarm.next().await.expect("Infinite Stream") { - SwarmEvent::Behaviour(Event::RelayClient( - relay::client::Event::ReservationReqAccepted { .. }, - )) => { - info!("Relay circuit reservation request accepted"); - relay_reservation_accepted = true; - } - SwarmEvent::Behaviour(Event::RendezvousClient( - rendezvous::client::Event::Registered { namespace, .. }, - )) => { - info!("Registered on rendezvous in namespace \"{namespace}\""); - rendezvous_registered = true; - } - event => debug!("{event:?}"), - } - - if relay_reservation_accepted && rendezvous_registered { - break; - } - } - - Ok((relay_peer_id, relay_address.clone())) -} - -const REDIAL_INTERVAL: Duration = Duration::from_secs(20); - /// Main loop polling the async swarm event stream and incoming service messages stream. struct EventLoop { /// libp2p swarm. @@ -287,17 +102,12 @@ struct EventLoop { /// Our own local PeerId. local_peer_id: PeerId, - /// Addresses of relays which we connected to during node startup, this means we - /// are: - /// - registered on it's rendezvous service and actively discovering other peers - /// - listening on a circuit relay address for relayed connections - relay_addresses: HashMap, - - /// Addresses of configured relay or direct peers with their corresponding PeerId. - /// Is only populated once we have made the first connection to the addressed peer - /// and received an identify message back containing the PeerId. + /// Addresses of configured relay or direct peers mapped to discovered PeerId's. known_peers: HashMap, + /// Relays for which we have discovered a PeerId via the identify behaviour. + relays: HashMap, + /// Scheduler which triggers known peer redial attempts. redial_scheduler: IntervalStream, @@ -310,8 +120,11 @@ struct EventLoop { /// Shutdown handler. shutdown_handler: ShutdownHandler, - /// Did we learn our own port yet? + /// Did we learn our own port yet. learned_port: bool, + + /// Did we learn our observed address yet. + learned_observed_addr: bool, } impl EventLoop { @@ -319,8 +132,6 @@ impl EventLoop { swarm: Swarm, network_config: NetworkConfiguration, local_peer_id: PeerId, - relay_addresses: HashMap, - known_peers: HashMap, tx: ServiceSender, shutdown_handler: ShutdownHandler, ) -> Self { @@ -331,10 +142,11 @@ impl EventLoop { local_peer_id, rx: BroadcastStream::new(tx.subscribe()), tx, - known_peers, - relay_addresses, + known_peers: HashMap::new(), + relays: HashMap::new(), shutdown_handler, learned_port: false, + learned_observed_addr: false, } } @@ -349,7 +161,7 @@ impl EventLoop { } // Wait a little bit for libp2p to actually close all connections - tokio::time::sleep(Duration::from_millis(25)).await; + tokio::time::sleep(Duration::from_millis(10)).await; self.shutdown_handler.set_done(); } @@ -359,6 +171,9 @@ impl EventLoop { pub async fn run(mut self) { let mut shutdown_request_received = self.shutdown_handler.is_requested(); + // Dial all known relay and direct peers. + self.attempt_dial_known_addresses().await; + loop { tokio::select! { event = self.swarm.next() => { @@ -380,7 +195,8 @@ impl EventLoop { SwarmEvent::Behaviour(Event::Mdns(event)) => self.handle_mdns_events(&event).await, SwarmEvent::Behaviour(Event::RendezvousClient(event)) => self.handle_rendezvous_client_events(&event).await, SwarmEvent::Behaviour(Event::Peers(event)) => self.handle_peers_events(&event).await, - event => trace!("{event:?}") + SwarmEvent::Behaviour(Event::RelayClient(event)) => self.handle_relay_client_events(&event).await, + event => self.handle_swarm_events(event).await, } } @@ -408,52 +224,16 @@ impl EventLoop { /// Attempt to dial all hardcoded relay and direct node addresses. Only establishes a new connection /// if we are currently not connected to the target peer. async fn attempt_dial_known_addresses(&mut self) { - fn try_dial_peer( - swarm: &mut Swarm, - known_peers: &mut HashMap, - address: &mut PeerAddress, - ) { - // Get the peers quic multiaddress, this can error if the address was provided in the form - // of a domain name and we are not able to resolve it to a valid multiaddress (for example, - // if we are offline). - let address = match address.quic_multiaddr() { - Ok(address) => address, - Err(e) => { - debug!("Failed to resolve relay multiaddr: {}", e.to_string()); - return; - } - }; - - // Construct dial opts depending on if we know the peer id of the peer we are dialing. - // We know the peer id if we have connected once to the peer in the current session. - let opts = match known_peers.get(&address) { - Some(peer_id) => DialOpts::peer_id(*peer_id) - .addresses(vec![address.to_owned()]) - .condition(PeerCondition::NotDialing) - .condition(PeerCondition::Disconnected) - .build(), - None => DialOpts::unknown_peer_id() - .address(address.to_owned()) - .build(), - }; - - // Dial the known peer. When dialing a peer by it's peer id this method will attempt a - // new connections if we are already connected to the peer or we are already dialing - // them. - match swarm.dial(opts) { - Ok(_) => (), - Err(err) => debug!("Error dialing node: {:?}", err), - }; - } - // Attempt to dial all relay addresses. for relay_address in self.network_config.relay_addresses.iter_mut() { - try_dial_peer(&mut self.swarm, &mut self.known_peers, relay_address); + debug!("Dial relay at address {}", relay_address); + dial_known_peer(&mut self.swarm, &mut self.known_peers, relay_address); } // Attempt to dial all direct peer addresses. for direct_node_address in self.network_config.direct_node_addresses.iter_mut() { - try_dial_peer(&mut self.swarm, &mut self.known_peers, direct_node_address); + debug!("Dial direct peer at address {}", direct_node_address); + dial_known_peer(&mut self.swarm, &mut self.known_peers, direct_node_address); } } @@ -504,21 +284,31 @@ impl EventLoop { rendezvous_node, .. } => { - debug!("Discovered peers registered at rendezvous: {registrations:?}",); - - for registration in registrations { - for address in registration.record.addresses() { - let peer_id = registration.record.peer_id(); + for Registration { record, .. } in registrations { + for address in record.addresses() { + let peer_id = record.peer_id(); if peer_id != self.local_peer_id { - debug!("Add new peer to address book: {} {}", peer_id, address); + debug!("Peer discovered via rendezvous: {} {}", peer_id, address); - if let Some(relay_address) = self.relay_addresses.get(rendezvous_node) { - let peer_circuit_address = relay_address - .clone() - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(peer_id)); + if self.swarm.is_connected(&peer_id) { + debug!("Already connected to peer {} not dialing", peer_id); + continue; + } + + if let Some(relay_address) = self.relays.get(rendezvous_node) { + debug!("Dialing peer {}", peer_id); + + let peer_circuit_address = + relay_address.circuit_addr().with(Protocol::P2p(peer_id)); + + let opts = DialOpts::peer_id(peer_id) + .override_dial_concurrency_factor( + NonZeroU8::new(1).expect("Is nonzero u8"), + ) + .addresses(vec![peer_circuit_address]) + .build(); - match self.swarm.dial(peer_circuit_address) { + match self.swarm.dial(opts) { Ok(_) => (), Err(err) => debug!("Error dialing peer: {:?}", err), }; @@ -529,6 +319,24 @@ impl EventLoop { } } } + rendezvous::client::Event::Registered { + namespace, + rendezvous_node, + .. + } => { + if let Some(relay) = self.relays.get_mut(&rendezvous_node) { + if !relay.registered { + debug!("Registered on rendezvous {rendezvous_node} in namespace \"{namespace}\""); + relay.registered = true; + } + if relay.discover(&mut self.swarm) { + info!( + "Discovering peers in namespace \"{}\" on relay {}", + relay.namespace, relay.peer_id + ); + }; + } + } event => trace!("{event:?}"), } } @@ -544,6 +352,19 @@ impl EventLoop { }, peer_id, } => { + // We now learned at least one of our observed addr. + self.learned_observed_addr = true; + + // If we don't know of the observed address a peer told us then add it to our + // external addresses. + if !self + .swarm + .external_addresses() + .any(|addr| addr == observed_addr) + { + self.swarm.add_external_address(observed_addr.clone()); + } + // Configuring known static relay and peer addresses is done by providing an ip // address or domain name and port. We don't yet know the peer id of the relay or // direct peer. Here we observe all identify events and check the addresses the @@ -552,33 +373,48 @@ impl EventLoop { // to avoid multiple connections being established to the same peer. // Check if the identified peer is one of our configured relay addresses. - for address in self.network_config.relay_addresses.iter_mut() { - if let Ok(addr) = address.quic_multiaddr() { - if listen_addrs.contains(&addr) { - debug!("Relay identified: {peer_id} {addr}"); - self.known_peers.insert(addr, *peer_id); - } + if let Some(addr) = + is_known_peer_address(&mut self.network_config.relay_addresses, listen_addrs) + { + if self.relays.get(peer_id).is_some() { + return; } + + // Add the relay to our known peers. + debug!("Relay identified: {peer_id} {addr}"); + self.known_peers.insert(addr.clone(), *peer_id); + + // Also add it to our map of identified relays. + self.relays.insert(*peer_id, Relay::new(*peer_id, addr)); } // Check if the identified peer is one of our direct node addresses. - for address in self.network_config.direct_node_addresses.iter_mut() { - if let Ok(addr) = address.quic_multiaddr() { - if listen_addrs.contains(&addr) { - debug!("Direct node identified: {peer_id} {addr}"); - self.known_peers.insert(addr, *peer_id); - } - } + if let Some(addr) = is_known_peer_address( + &mut self.network_config.direct_node_addresses, + listen_addrs, + ) { + // Add the direct node to our known peers. + debug!("Direct node identified: {peer_id} {addr}"); + self.known_peers.insert(addr, *peer_id); } + } - // If we don't know of the observed address a peer told us then add it to our - // external addresses. - if !self - .swarm - .external_addresses() - .any(|addr| addr == observed_addr) - { - self.swarm.add_external_address(observed_addr.clone()); + identify::Event::Sent { peer_id } => { + if let Some(relay) = self.relays.get_mut(peer_id) { + if !relay.told_addr { + debug!("Told relay {} its public address", { peer_id }); + relay.told_addr = true; + } + + // Attempt to register with the relay. + match relay.register(&mut self.swarm) { + Ok(registered) => { + if registered { + debug!("Registered on relay {} at {}", relay.peer_id, relay.addr) + } + } + Err(e) => debug!("Error registering on relay: {}", e), + }; } } event => trace!("{event:?}"), @@ -591,9 +427,8 @@ impl EventLoop { for (peer_id, _) in list { debug!("mDNS discovered a new peer: {peer_id}"); + // Dial discovered peer. let dial_opts = DialOpts::peer_id(*peer_id) - .condition(PeerCondition::Disconnected) - .condition(PeerCondition::NotDialing) .override_dial_concurrency_factor(NonZeroU8::new(1).expect("Is nonzero u8")) .build(); @@ -606,31 +441,84 @@ impl EventLoop { event => trace!("{event:?}"), } } + + async fn handle_relay_client_events(&mut self, event: &relay::client::Event) { + match event { + relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => { + debug!("Relay {relay_peer_id} accepted circuit reservation request"); + + if let Some(relay) = self.relays.get_mut(&relay_peer_id) { + relay.reservation_accepted = true; + // Attempt to start discovering peers at the configured namespace. + if relay.discover(&mut self.swarm) { + info!( + "Discovering peers in namespace \"{}\" on relay {}", + relay.namespace, relay.peer_id + ); + }; + } + } + event => trace!("{event:?}"), + } + } + + async fn handle_swarm_events(&mut self, event: SwarmEvent) { + match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + num_established, + .. + } => { + debug!( + "Connection established with peer {}({}) at {}, total connections {}", + peer_id, + connection_id, + endpoint.get_remote_address(), + num_established + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + endpoint, + cause, + .. + } => { + debug!( + "Connection closed with peer {}({}) at {}: {}", + peer_id, + connection_id, + endpoint.get_remote_address(), + cause + .map(|cause| cause.to_string()) + .unwrap_or("No cause given".to_string()) + ); + + // Remove this peer address from our known peers. + self.known_peers.remove(&endpoint.get_remote_address()); + } + event => trace!("{event:?}"), + } + } } pub async fn spawn_event_loop( swarm: Swarm, network_config: NetworkConfiguration, local_peer_id: PeerId, - relay_addresses: HashMap, shutdown: Shutdown, tx: ServiceSender, tx_ready: ServiceReadySender, ) -> Result<()> { let mut shutdown_handler = ShutdownHandler::new(); - let mut known_peers = HashMap::new(); - for (peer_id, addr) in relay_addresses.iter() { - known_peers.insert(addr.to_owned(), *peer_id); - } - // Spawn a task to run swarm in event loop let event_loop = EventLoop::new( swarm, network_config, local_peer_id, - relay_addresses, - known_peers, tx, shutdown_handler.clone(), ); diff --git a/aquadoggo/src/network/utils.rs b/aquadoggo/src/network/utils.rs index 5f320d4d0..abc20f227 100644 --- a/aquadoggo/src/network/utils.rs +++ b/aquadoggo/src/network/utils.rs @@ -1,10 +1,17 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::collections::HashMap; use std::net::SocketAddr; +use std::num::NonZeroU8; -use libp2p::Multiaddr; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::{Multiaddr, PeerId, Swarm}; +use log::debug; use regex::Regex; +use crate::network::behaviour::P2pandaBehaviour; +use crate::network::config::PeerAddress; + pub fn to_quic_address(address: &Multiaddr) -> Option { let hay = address.to_string(); let regex = Regex::new(r"/ip4/(\d+.\d+.\d+.\d+)/udp/(\d+)/quic-v1").unwrap(); @@ -22,3 +29,54 @@ pub fn to_quic_address(address: &Multiaddr) -> Option { } } } + +pub fn is_known_peer_address( + known_addresses: &mut Vec, + peer_addresses: &Vec, +) -> Option { + for address in known_addresses.iter_mut() { + if let Ok(addr) = address.quic_multiaddr() { + if peer_addresses.contains(&addr) { + return Some(addr.clone()); + } + } + } + None +} + +pub fn dial_known_peer( + swarm: &mut Swarm, + known_peers: &mut HashMap, + address: &mut PeerAddress, +) { + // Get the peers quic multiaddress, this can error if the address was provided in the form + // of a domain name and we are not able to resolve it to a valid multiaddress (for example, + // if we are offline). + let address = match address.quic_multiaddr() { + Ok(address) => address, + Err(e) => { + debug!("Failed to resolve relay multiaddr: {}", e.to_string()); + return; + } + }; + + // Construct dial opts depending on if we know the peer id of the peer we are dialing. + // We know the peer id if we have connected once to the peer in the current session. + let opts = match known_peers.get(&address) { + Some(peer_id) => DialOpts::peer_id(*peer_id) + .addresses(vec![address.to_owned()]) + .override_dial_concurrency_factor(NonZeroU8::new(1).expect("Is nonzero u8")) + .build(), + None => DialOpts::unknown_peer_id() + .address(address.to_owned()) + .build(), + }; + + // Dial the known peer. When dialing a peer by it's peer id this method will attempt a + // new connections if we are already connected to the peer or we are already dialing + // them. + match swarm.dial(opts) { + Ok(_) => (), + Err(err) => debug!("Error dialing node: {:?}", err), + }; +}