From e9474f17f66d8b3a8e0c8fd8e2d5f2fdfc9cf60d Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 20:05:55 +0100 Subject: [PATCH 01/11] Poll all known peer addresses --- aquadoggo/src/network/service.rs | 49 ++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 411d59310..af07dedac 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -12,15 +12,17 @@ use libp2p::swarm::SwarmEvent; use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm}; use log::{debug, info, trace, warn}; use tokio::task; -use tokio_stream::wrappers::BroadcastStream; +use tokio::time::interval; +use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; use tokio_stream::StreamExt; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; use crate::network::behaviour::{Event, P2pandaBehaviour}; -use crate::network::config::NODE_NAMESPACE; +use crate::network::config::{PeerAddress, NODE_NAMESPACE}; use crate::network::{identity, peers, swarm, utils, ShutdownHandler}; +use crate::NetworkConfiguration; const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(20); @@ -193,6 +195,7 @@ pub async fn network_service( // Spawn main event loop handling all p2panda and libp2p network events. spawn_event_loop( swarm, + network_config.to_owned(), local_peer_id, connected_relays, shutdown, @@ -317,9 +320,13 @@ pub async fn connect_to_relay( Ok((relay_peer_id, relay_address.clone())) } +const REDIAL_INTERVAL: Duration = Duration::from_secs(20); + /// Main loop polling the async swarm event stream and incoming service messages stream. struct EventLoop { swarm: Swarm, + network_config: NetworkConfiguration, + redial_scheduler: IntervalStream, local_peer_id: PeerId, tx: ServiceSender, rx: BroadcastStream, @@ -331,6 +338,7 @@ struct EventLoop { impl EventLoop { pub fn new( swarm: Swarm, + network_config: NetworkConfiguration, local_peer_id: PeerId, tx: ServiceSender, relay_addresses: HashMap, @@ -338,6 +346,8 @@ impl EventLoop { ) -> Self { Self { swarm, + network_config, + redial_scheduler: IntervalStream::new(interval(REDIAL_INTERVAL)), local_peer_id, rx: BroadcastStream::new(tx.subscribe()), tx, @@ -403,6 +413,10 @@ impl EventLoop { return }, }, + // The redial_scheduler emits an event every `REDIAL_INTERVAL` seconds. + Some(_) = self.redial_scheduler.next() => { + self.attempt_dial_known_addresses().await; + }, _ = shutdown_request_received.next() => { self.shutdown().await; } @@ -410,6 +424,35 @@ impl EventLoop { } } + /// Attempt to dial all hardcoded relay and direct node addresses. Only establishes a new connection + /// if we are currently not connected to the target peer. + async fn attempt_dial_known_addresses(&mut self) { + fn try_dial_peer(swarm: &mut Swarm, address: &mut PeerAddress) { + let address = match address.to_quic_multiaddr() { + Ok(address) => address, + Err(e) => { + debug!("Failed to resolve relay multiaddr: {}", e.to_string()); + return; + } + }; + + let opts = DialOpts::unknown_peer_id().address(address.clone()).build(); + + match swarm.dial(opts) { + Ok(_) => (), + Err(err) => debug!("Error dialing node: {:?}", err), + }; + } + + for relay_address in self.network_config.relay_addresses.iter_mut() { + try_dial_peer(&mut self.swarm, relay_address); + } + + for direct_node_address in self.network_config.direct_node_addresses.iter_mut() { + try_dial_peer(&mut self.swarm, direct_node_address); + } + } + /// Send a message on the communication bus to inform other services. fn send_service_message(&mut self, message: ServiceMessage) { if self.tx.send(message).is_err() { @@ -530,6 +573,7 @@ impl EventLoop { pub async fn spawn_event_loop( swarm: Swarm, + network_config: NetworkConfiguration, local_peer_id: PeerId, relay_addresses: HashMap, shutdown: Shutdown, @@ -541,6 +585,7 @@ pub async fn spawn_event_loop( // Spawn a task to run swarm in event loop let event_loop = EventLoop::new( swarm, + network_config, local_peer_id, tx, relay_addresses, From ac792ac7130dae23a83cdfc05ffb82e97864bcfa Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 21:17:24 +0100 Subject: [PATCH 02/11] Update PeerAddress method name --- aquadoggo/src/network/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index af07dedac..8f61933b2 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -428,7 +428,7 @@ impl EventLoop { /// if we are currently not connected to the target peer. async fn attempt_dial_known_addresses(&mut self) { fn try_dial_peer(swarm: &mut Swarm, address: &mut PeerAddress) { - let address = match address.to_quic_multiaddr() { + let address = match address.quic_multiaddr() { Ok(address) => address, Err(e) => { debug!("Failed to resolve relay multiaddr: {}", e.to_string()); From 3d887d5b818d0c936753a5ef8674ad02d2eddd53 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 21:33:13 +0100 Subject: [PATCH 03/11] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6a964a7..15128fd3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Introduce `PeerAddress` struct to help resolve `String` to internal address types [#621](https://github.com/p2panda/aquadoggo/pull/621) +- Re-dial all configured known peers on schedule [#622](https://github.com/p2panda/aquadoggo/pull/622) ### Changed From 18f78481a2bd3c424ff3626a5b9ba2cad0baa695 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Sat, 15 Jun 2024 10:45:52 +0100 Subject: [PATCH 04/11] WIP: poll known peers --- aquadoggo/src/network/service.rs | 52 ++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 8f61933b2..0e16fc52f 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -330,6 +330,7 @@ struct EventLoop { local_peer_id: PeerId, tx: ServiceSender, rx: BroadcastStream, + known_peers: HashMap, relay_addresses: HashMap, shutdown_handler: ShutdownHandler, learned_port: bool, @@ -340,8 +341,9 @@ impl EventLoop { swarm: Swarm, network_config: NetworkConfiguration, local_peer_id: PeerId, - tx: ServiceSender, relay_addresses: HashMap, + known_peers: HashMap, + tx: ServiceSender, shutdown_handler: ShutdownHandler, ) -> Self { Self { @@ -351,6 +353,7 @@ impl EventLoop { local_peer_id, rx: BroadcastStream::new(tx.subscribe()), tx, + known_peers, relay_addresses, shutdown_handler, learned_port: false, @@ -427,7 +430,11 @@ impl EventLoop { /// Attempt to dial all hardcoded relay and direct node addresses. Only establishes a new connection /// if we are currently not connected to the target peer. async fn attempt_dial_known_addresses(&mut self) { - fn try_dial_peer(swarm: &mut Swarm, address: &mut PeerAddress) { + fn try_dial_peer( + swarm: &mut Swarm, + known_peers: &mut HashMap, + address: &mut PeerAddress, + ) { let address = match address.quic_multiaddr() { Ok(address) => address, Err(e) => { @@ -436,7 +443,16 @@ impl EventLoop { } }; - let opts = DialOpts::unknown_peer_id().address(address.clone()).build(); + let opts = match known_peers.get(&address) { + Some(peer_id) => DialOpts::peer_id(*peer_id) + .addresses(vec![address.to_owned()]) + .condition(PeerCondition::NotDialing) + .condition(PeerCondition::Disconnected) + .build(), + None => DialOpts::unknown_peer_id() + .address(address.to_owned()) + .build(), + }; match swarm.dial(opts) { Ok(_) => (), @@ -445,11 +461,11 @@ impl EventLoop { } for relay_address in self.network_config.relay_addresses.iter_mut() { - try_dial_peer(&mut self.swarm, relay_address); + try_dial_peer(&mut self.swarm, &mut self.known_peers, relay_address); } for direct_node_address in self.network_config.direct_node_addresses.iter_mut() { - try_dial_peer(&mut self.swarm, direct_node_address); + try_dial_peer(&mut self.swarm, &mut self.known_peers, direct_node_address); } } @@ -532,10 +548,22 @@ impl EventLoop { async fn handle_identify_events(&mut self, event: &identify::Event) { match event { identify::Event::Received { - info: identify::Info { observed_addr, .. }, - .. + info: + identify::Info { + observed_addr, + listen_addrs, + .. + }, + peer_id, } => { - debug!("Observed external address reported: {observed_addr}"); + for address in self.network_config.relay_addresses.iter_mut() { + if let Some(addr) = address.quic_multiaddr().ok() { + if listen_addrs.contains(&addr) { + self.known_peers.insert(addr, *peer_id); + } + } + } + if !self .swarm .external_addresses() @@ -582,13 +610,19 @@ pub async fn spawn_event_loop( ) -> Result<()> { let mut shutdown_handler = ShutdownHandler::new(); + let mut known_peers = HashMap::new(); + for (peer_id, addr) in relay_addresses.iter() { + known_peers.insert(addr.to_owned(), *peer_id); + } + // Spawn a task to run swarm in event loop let event_loop = EventLoop::new( swarm, network_config, local_peer_id, - tx, relay_addresses, + known_peers, + tx, shutdown_handler.clone(), ); let handle = task::spawn(event_loop.run()); From fd7ef1fbd44d21ec2885859f46f0e2945fc382a7 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 09:47:46 +0100 Subject: [PATCH 05/11] Check if a direct node was identified (and add comments) --- aquadoggo/src/network/service.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 0e16fc52f..d07fb8e35 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -556,9 +556,28 @@ impl EventLoop { }, peer_id, } => { + // Configuring known static relay and peer addresses is done by providing an ip + // address or domain name and port. We don't yet know the peer id of the relay or + // direct peer. Here we observe all identify events and check the addresses the + // identified peer provides. If one matches our known addresses then we can add + // their peer id to our address book. This is then used when dialing the peer + // to avoid multiple connections being established to the same peer. + + // Check if the identified peer is one of our configured relay addresses. for address in self.network_config.relay_addresses.iter_mut() { if let Some(addr) = address.quic_multiaddr().ok() { if listen_addrs.contains(&addr) { + debug!("Relay identified: {peer_id} {addr}"); + self.known_peers.insert(addr, *peer_id); + } + } + } + + // Check if the identified peer is one of our direct node addresses. + for address in self.network_config.direct_node_addresses.iter_mut() { + if let Some(addr) = address.quic_multiaddr().ok() { + if listen_addrs.contains(&addr) { + debug!("Direct node identified: {peer_id} {addr}"); self.known_peers.insert(addr, *peer_id); } } From ec5d650e394ce70a94cd8d5bc77e96017e1e9df2 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 09:49:14 +0100 Subject: [PATCH 06/11] Don't dial direct node address on startup, rely on scheduler --- aquadoggo/src/network/service.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index d07fb8e35..f4cd0b894 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -167,29 +167,6 @@ pub async fn network_service( ); } } - - // Dial all nodes we want to directly connect to. - for direct_node_address in network_config.direct_node_addresses.iter_mut() { - info!("Connecting to node @ {}", direct_node_address); - - let direct_node_address = match direct_node_address.quic_multiaddr() { - Ok(address) => address, - Err(e) => { - debug!("Failed to resolve direct node multiaddr: {}", e.to_string()); - continue; - } - }; - - let opts = DialOpts::unknown_peer_id() - .address(direct_node_address.clone()) - .build(); - - match swarm.dial(opts) { - Ok(_) => (), - Err(err) => debug!("Error dialing node: {:?}", err), - }; - } - info!("Network service ready!"); // Spawn main event loop handling all p2panda and libp2p network events. From 4a03e97568f0c5718cb1f4f3f70e49beb37aa357 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 19:05:56 +0100 Subject: [PATCH 07/11] More comments --- aquadoggo/src/network/service.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index f4cd0b894..f6701f5bd 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -7,6 +7,7 @@ use std::time::Duration; use anyhow::Result; use libp2p::multiaddr::Protocol; +use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::SwarmEvent; use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm}; @@ -412,6 +413,9 @@ impl EventLoop { known_peers: &mut HashMap, address: &mut PeerAddress, ) { + // Get the peers quic multiaddress, this can error if the address was provided in the form + // of a domain name and we are not able to resolve it to a valid multiaddress (for example, + // if we are offline). let address = match address.quic_multiaddr() { Ok(address) => address, Err(e) => { @@ -420,6 +424,8 @@ impl EventLoop { } }; + // Construct dial opts depending on if we know the peer id of the peer we are dialing. + // We know the peer id if we have connected once to the peer in the current session. let opts = match known_peers.get(&address) { Some(peer_id) => DialOpts::peer_id(*peer_id) .addresses(vec![address.to_owned()]) @@ -431,6 +437,9 @@ impl EventLoop { .build(), }; + // Dial the known peer. When dialing a peer by it's peer id this method will attempt a + // new connections if we are already connected to the peer or we are already dialing + // them. match swarm.dial(opts) { Ok(_) => (), Err(err) => debug!("Error dialing node: {:?}", err), @@ -535,11 +544,11 @@ impl EventLoop { } => { // Configuring known static relay and peer addresses is done by providing an ip // address or domain name and port. We don't yet know the peer id of the relay or - // direct peer. Here we observe all identify events and check the addresses the + // direct peer. Here we observe all identify events and check the addresses the // identified peer provides. If one matches our known addresses then we can add // their peer id to our address book. This is then used when dialing the peer // to avoid multiple connections being established to the same peer. - + // Check if the identified peer is one of our configured relay addresses. for address in self.network_config.relay_addresses.iter_mut() { if let Some(addr) = address.quic_multiaddr().ok() { @@ -560,6 +569,8 @@ impl EventLoop { } } + // If we don't know of the observed address a peer told us then add it to our + // external addresses. if !self .swarm .external_addresses() From d5bd91c8489c99abe16ec178dcc3092d866213c2 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 19:08:10 +0100 Subject: [PATCH 08/11] Remove unused import --- aquadoggo/src/network/service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index f6701f5bd..cc9170a68 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -7,7 +7,6 @@ use std::time::Duration; use anyhow::Result; use libp2p::multiaddr::Protocol; -use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::SwarmEvent; use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm}; From 848de5c4d07eb82b0bc192e5a5a488bef5586c70 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 19:08:33 +0100 Subject: [PATCH 09/11] fmt --- aquadoggo/src/network/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index cc9170a68..0a04f3903 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -413,7 +413,7 @@ impl EventLoop { address: &mut PeerAddress, ) { // Get the peers quic multiaddress, this can error if the address was provided in the form - // of a domain name and we are not able to resolve it to a valid multiaddress (for example, + // of a domain name and we are not able to resolve it to a valid multiaddress (for example, // if we are offline). let address = match address.quic_multiaddr() { Ok(address) => address, From 06883c89ef0f47cfe1cbce13ff5869ad0da77168 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 19:43:00 +0100 Subject: [PATCH 10/11] Doc strings for EventLoop struct --- aquadoggo/src/network/service.rs | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 0a04f3903..5de0812ff 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -301,15 +301,39 @@ const REDIAL_INTERVAL: Duration = Duration::from_secs(20); /// Main loop polling the async swarm event stream and incoming service messages stream. struct EventLoop { + /// libp2p swarm. swarm: Swarm, + + /// p2panda network configuration. network_config: NetworkConfiguration, - redial_scheduler: IntervalStream, + + /// Our own local PeerId. local_peer_id: PeerId, + + /// Addresses of relays which we connected to during node startup, this means we + /// are: + /// - registered on it's rendezvous service and actively discovering other peers + /// - listening on a circuit relay address for relayed connections + relay_addresses: HashMap, + + /// Addresses of configured relay or direct peers with their corresponding PeerId. + /// Is only populated once we have made the first connection to the addressed peer + /// and received an identify message back containing the PeerId. + known_peers: HashMap, + + /// Scheduler which triggers known peer redial attempts. + redial_scheduler: IntervalStream, + + /// Service message channel sender. tx: ServiceSender, + + /// Service message channel receiver. rx: BroadcastStream, - known_peers: HashMap, - relay_addresses: HashMap, + + /// Shutdown handler. shutdown_handler: ShutdownHandler, + + /// Did we learn our own port yet? learned_port: bool, } @@ -445,10 +469,12 @@ impl EventLoop { }; } + // Attempt to dial all relay addresses. for relay_address in self.network_config.relay_addresses.iter_mut() { try_dial_peer(&mut self.swarm, &mut self.known_peers, relay_address); } + // Attempt to dial all direct peer addresses. for direct_node_address in self.network_config.direct_node_addresses.iter_mut() { try_dial_peer(&mut self.swarm, &mut self.known_peers, direct_node_address); } From 6c13f1b11646cfa9525fe374de96aac50e5481b5 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 18 Jun 2024 19:44:48 +0100 Subject: [PATCH 11/11] Clippy --- aquadoggo/src/network/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 5de0812ff..98356c215 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -316,8 +316,8 @@ struct EventLoop { /// - listening on a circuit relay address for relayed connections relay_addresses: HashMap, - /// Addresses of configured relay or direct peers with their corresponding PeerId. - /// Is only populated once we have made the first connection to the addressed peer + /// Addresses of configured relay or direct peers with their corresponding PeerId. + /// Is only populated once we have made the first connection to the addressed peer /// and received an identify message back containing the PeerId. known_peers: HashMap, @@ -576,7 +576,7 @@ impl EventLoop { // Check if the identified peer is one of our configured relay addresses. for address in self.network_config.relay_addresses.iter_mut() { - if let Some(addr) = address.quic_multiaddr().ok() { + if let Ok(addr) = address.quic_multiaddr() { if listen_addrs.contains(&addr) { debug!("Relay identified: {peer_id} {addr}"); self.known_peers.insert(addr, *peer_id); @@ -586,7 +586,7 @@ impl EventLoop { // Check if the identified peer is one of our direct node addresses. for address in self.network_config.direct_node_addresses.iter_mut() { - if let Some(addr) = address.quic_multiaddr().ok() { + if let Ok(addr) = address.quic_multiaddr() { if listen_addrs.contains(&addr) { debug!("Direct node identified: {peer_id} {addr}"); self.known_peers.insert(addr, *peer_id);