Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dial all configured known relay and direct node addresses on schedule #622

Merged
merged 11 commits into from
Jun 18, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Introduce `PeerAddress` struct to help resolve `String` to internal address types [#621](https://github.com/p2panda/aquadoggo/pull/621)
- Re-dial all configured known peers on schedule [#622](https://github.com/p2panda/aquadoggo/pull/622)

### Changed

Expand Down
173 changes: 142 additions & 31 deletions aquadoggo/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ use libp2p::swarm::SwarmEvent;
use libp2p::{identify, mdns, relay, rendezvous, Multiaddr, PeerId, Swarm};
use log::{debug, info, trace, warn};
use tokio::task;
use tokio_stream::wrappers::BroadcastStream;
use tokio::time::interval;
use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
use tokio_stream::StreamExt;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::network::behaviour::{Event, P2pandaBehaviour};
use crate::network::config::NODE_NAMESPACE;
use crate::network::config::{PeerAddress, NODE_NAMESPACE};
use crate::network::{identity, peers, swarm, utils, ShutdownHandler};
use crate::NetworkConfiguration;

const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);

Expand Down Expand Up @@ -165,34 +167,12 @@ pub async fn network_service(
);
}
}

// Dial all nodes we want to directly connect to.
for direct_node_address in network_config.direct_node_addresses.iter_mut() {
info!("Connecting to node @ {}", direct_node_address);

let direct_node_address = match direct_node_address.quic_multiaddr() {
Ok(address) => address,
Err(e) => {
debug!("Failed to resolve direct node multiaddr: {}", e.to_string());
continue;
}
};

let opts = DialOpts::unknown_peer_id()
.address(direct_node_address.clone())
.build();

match swarm.dial(opts) {
Ok(_) => (),
Err(err) => debug!("Error dialing node: {:?}", err),
};
}

info!("Network service ready!");

// Spawn main event loop handling all p2panda and libp2p network events.
spawn_event_loop(
swarm,
network_config.to_owned(),
local_peer_id,
connected_relays,
shutdown,
Expand Down Expand Up @@ -317,30 +297,64 @@ pub async fn connect_to_relay(
Ok((relay_peer_id, relay_address.clone()))
}

const REDIAL_INTERVAL: Duration = Duration::from_secs(20);

/// Main loop polling the async swarm event stream and incoming service messages stream.
struct EventLoop {
/// libp2p swarm.
swarm: Swarm<P2pandaBehaviour>,

/// p2panda network configuration.
network_config: NetworkConfiguration,

/// Our own local PeerId.
local_peer_id: PeerId,

/// Addresses of relays which we connected to during node startup, this means we
/// are:
/// - registered on it's rendezvous service and actively discovering other peers
/// - listening on a circuit relay address for relayed connections
relay_addresses: HashMap<PeerId, Multiaddr>,

/// Addresses of configured relay or direct peers with their corresponding PeerId.
/// Is only populated once we have made the first connection to the addressed peer
/// and received an identify message back containing the PeerId.
known_peers: HashMap<Multiaddr, PeerId>,

/// Scheduler which triggers known peer redial attempts.
redial_scheduler: IntervalStream,

/// Service message channel sender.
tx: ServiceSender,

/// Service message channel receiver.
rx: BroadcastStream<ServiceMessage>,
relay_addresses: HashMap<PeerId, Multiaddr>,

/// Shutdown handler.
shutdown_handler: ShutdownHandler,

/// Did we learn our own port yet?
learned_port: bool,
}

impl EventLoop {
pub fn new(
swarm: Swarm<P2pandaBehaviour>,
network_config: NetworkConfiguration,
local_peer_id: PeerId,
tx: ServiceSender,
relay_addresses: HashMap<PeerId, Multiaddr>,
known_peers: HashMap<Multiaddr, PeerId>,
tx: ServiceSender,
shutdown_handler: ShutdownHandler,
) -> Self {
Self {
swarm,
network_config,
redial_scheduler: IntervalStream::new(interval(REDIAL_INTERVAL)),
local_peer_id,
rx: BroadcastStream::new(tx.subscribe()),
tx,
known_peers,
relay_addresses,
shutdown_handler,
learned_port: false,
Expand Down Expand Up @@ -403,13 +417,69 @@ impl EventLoop {
return
},
},
// The redial_scheduler emits an event every `REDIAL_INTERVAL` seconds.
Some(_) = self.redial_scheduler.next() => {
self.attempt_dial_known_addresses().await;
},
_ = shutdown_request_received.next() => {
self.shutdown().await;
}
}
}
}

/// Attempt to dial all hardcoded relay and direct node addresses. Only establishes a new connection
/// if we are currently not connected to the target peer.
async fn attempt_dial_known_addresses(&mut self) {
fn try_dial_peer(
swarm: &mut Swarm<P2pandaBehaviour>,
known_peers: &mut HashMap<Multiaddr, PeerId>,
address: &mut PeerAddress,
) {
// Get the peers quic multiaddress, this can error if the address was provided in the form
// of a domain name and we are not able to resolve it to a valid multiaddress (for example,
// if we are offline).
let address = match address.quic_multiaddr() {
Ok(address) => address,
Err(e) => {
debug!("Failed to resolve relay multiaddr: {}", e.to_string());
return;
}
};

// Construct dial opts depending on if we know the peer id of the peer we are dialing.
// We know the peer id if we have connected once to the peer in the current session.
let opts = match known_peers.get(&address) {
Some(peer_id) => DialOpts::peer_id(*peer_id)
.addresses(vec![address.to_owned()])
.condition(PeerCondition::NotDialing)
.condition(PeerCondition::Disconnected)
.build(),
None => DialOpts::unknown_peer_id()
.address(address.to_owned())
.build(),
};

// Dial the known peer. When dialing a peer by it's peer id this method will attempt a
// new connections if we are already connected to the peer or we are already dialing
// them.
match swarm.dial(opts) {
Ok(_) => (),
Err(err) => debug!("Error dialing node: {:?}", err),
};
}

// Attempt to dial all relay addresses.
for relay_address in self.network_config.relay_addresses.iter_mut() {
try_dial_peer(&mut self.swarm, &mut self.known_peers, relay_address);
}

// Attempt to dial all direct peer addresses.
for direct_node_address in self.network_config.direct_node_addresses.iter_mut() {
try_dial_peer(&mut self.swarm, &mut self.known_peers, direct_node_address);
}
}

/// Send a message on the communication bus to inform other services.
fn send_service_message(&mut self, message: ServiceMessage) {
if self.tx.send(message).is_err() {
Expand Down Expand Up @@ -489,10 +559,43 @@ impl EventLoop {
async fn handle_identify_events(&mut self, event: &identify::Event) {
match event {
identify::Event::Received {
info: identify::Info { observed_addr, .. },
..
info:
identify::Info {
observed_addr,
listen_addrs,
..
},
peer_id,
} => {
debug!("Observed external address reported: {observed_addr}");
// Configuring known static relay and peer addresses is done by providing an ip
// address or domain name and port. We don't yet know the peer id of the relay or
// direct peer. Here we observe all identify events and check the addresses the
// identified peer provides. If one matches our known addresses then we can add
// their peer id to our address book. This is then used when dialing the peer
// to avoid multiple connections being established to the same peer.

// Check if the identified peer is one of our configured relay addresses.
for address in self.network_config.relay_addresses.iter_mut() {
if let Ok(addr) = address.quic_multiaddr() {
if listen_addrs.contains(&addr) {
debug!("Relay identified: {peer_id} {addr}");
self.known_peers.insert(addr, *peer_id);
}
}
}

// Check if the identified peer is one of our direct node addresses.
for address in self.network_config.direct_node_addresses.iter_mut() {
if let Ok(addr) = address.quic_multiaddr() {
if listen_addrs.contains(&addr) {
debug!("Direct node identified: {peer_id} {addr}");
self.known_peers.insert(addr, *peer_id);
}
}
}

// If we don't know of the observed address a peer told us then add it to our
// external addresses.
if !self
.swarm
.external_addresses()
Expand Down Expand Up @@ -530,6 +633,7 @@ impl EventLoop {

pub async fn spawn_event_loop(
swarm: Swarm<P2pandaBehaviour>,
network_config: NetworkConfiguration,
local_peer_id: PeerId,
relay_addresses: HashMap<PeerId, Multiaddr>,
shutdown: Shutdown,
Expand All @@ -538,12 +642,19 @@ pub async fn spawn_event_loop(
) -> Result<()> {
let mut shutdown_handler = ShutdownHandler::new();

let mut known_peers = HashMap::new();
for (peer_id, addr) in relay_addresses.iter() {
known_peers.insert(addr.to_owned(), *peer_id);
}

// Spawn a task to run swarm in event loop
let event_loop = EventLoop::new(
swarm,
network_config,
local_peer_id,
tx,
relay_addresses,
known_peers,
tx,
shutdown_handler.clone(),
);
let handle = task::spawn(event_loop.run());
Expand Down
Loading