Skip to content

Commit

Permalink
Merge pull request #2569 from jacderida/closest_relay_reservation2
Browse files Browse the repository at this point in the history
fix(relay): make reservation to our closest peers
  • Loading branch information
jacderida authored Dec 23, 2024
2 parents a3adc3a + e8092e7 commit 8cd61ae
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions ant-networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::driver::{BadNodes, NodeBehaviour};
use ant_protocol::NetworkAddress;
use itertools::Itertools;
use libp2p::{
core::transport::ListenerId, multiaddr::Protocol, Multiaddr, PeerId, StreamProtocol, Swarm,
};
use rand::Rng;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet};

const MAX_CONCURRENT_RELAY_CONNECTIONS: usize = 4;
const MAX_POTENTIAL_CANDIDATES: usize = 1000;
Expand All @@ -28,7 +29,7 @@ pub(crate) fn is_a_relayed_peer(addrs: &HashSet<Multiaddr>) -> bool {
#[derive(Debug)]
pub(crate) struct RelayManager {
self_peer_id: PeerId,
candidates: VecDeque<(PeerId, Multiaddr)>,
candidates: Vec<(PeerId, Multiaddr)>,
waiting_for_reservation: BTreeMap<PeerId, Multiaddr>,
connected_relays: BTreeMap<PeerId, Multiaddr>,

Expand Down Expand Up @@ -72,7 +73,7 @@ impl RelayManager {
// Hence here can add the addr directly.
if let Some(relay_addr) = Self::craft_relay_address(addr, Some(*peer_id)) {
debug!("Adding {peer_id:?} with {relay_addr:?} as a potential relay candidate");
self.candidates.push_back((*peer_id, relay_addr));
self.candidates.push((*peer_id, relay_addr));
}
}
} else {
Expand Down Expand Up @@ -101,44 +102,47 @@ impl RelayManager {
// todo: should we remove all our other `listen_addr`? And should we block from adding `add_external_address` if
// we're behind nat?

// Pick a random candidate from the vector. Check if empty, or `gen_range` panics for empty range.
// Pick a random closest candidate from the vector. Check if empty, or `gen_range` panics for empty range.
let self_peer_id = NetworkAddress::from_peer(self.self_peer_id);
self.candidates.sort_by_key(|(peer_id, _)| {
self_peer_id.distance(&NetworkAddress::from_peer(*peer_id))
});
let index = if self.candidates.is_empty() {
debug!("No more relay candidates.");
break;
} else {
rand::thread_rng().gen_range(0..self.candidates.len())
let max_range = std::cmp::min(self.candidates.len(), 5);
rand::thread_rng().gen_range(0..max_range)
};

if let Some((peer_id, relay_addr)) = self.candidates.remove(index) {
// skip if detected as a bad node
if let Some((_, is_bad)) = bad_nodes.get(&peer_id) {
if *is_bad {
debug!("Peer {peer_id:?} is considered as a bad node. Skipping it.");
continue;
}
}

if self.connected_relays.contains_key(&peer_id)
|| self.waiting_for_reservation.contains_key(&peer_id)
{
debug!("We are already using {peer_id:?} as a relay server. Skipping.");
let (peer_id, relay_addr) = self.candidates.swap_remove(index);
// skip if detected as a bad node
if let Some((_, is_bad)) = bad_nodes.get(&peer_id) {
if *is_bad {
debug!("Peer {peer_id:?} is considered as a bad node. Skipping it.");
continue;
}
}

if self.connected_relays.contains_key(&peer_id)
|| self.waiting_for_reservation.contains_key(&peer_id)
{
debug!("We are already using {peer_id:?} as a relay server. Skipping.");
continue;
}

match swarm.listen_on(relay_addr.clone()) {
Ok(id) => {
info!("Sending reservation to relay {peer_id:?} on {relay_addr:?}");
self.waiting_for_reservation.insert(peer_id, relay_addr);
self.relayed_listener_id_map.insert(id, peer_id);
n_reservations += 1;
}
Err(err) => {
error!("Error while trying to listen on the relay addr: {err:?} on {relay_addr:?}");
}
match swarm.listen_on(relay_addr.clone()) {
Ok(id) => {
info!("Sending reservation to relay {peer_id:?} on {relay_addr:?}");
self.waiting_for_reservation.insert(peer_id, relay_addr);
self.relayed_listener_id_map.insert(id, peer_id);
n_reservations += 1;
}
Err(err) => {
error!(
"Error while trying to listen on the relay addr: {err:?} on {relay_addr:?}"
);
}
} else {
debug!("No more relay candidates.");
break;
}
}
}
Expand Down

0 comments on commit 8cd61ae

Please sign in to comment.