Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes to address connection errors and rise on adding peers into RT & extra disk writes #2529

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ impl NetworkBuilder {
quotes_history: Default::default(),
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
network_density_samples: FifoRegister::new(100),
};

Expand Down Expand Up @@ -867,8 +866,6 @@ pub struct SwarmDriver {
/// when was the last replication event
/// This allows us to throttle replication no matter how it is triggered
pub(crate) last_replication: Option<Instant>,
/// when was the last outdated connection prunning undertaken.
pub(crate) last_connection_pruning_time: Instant,
/// FIFO cache for the network density samples
pub(crate) network_density_samples: FifoRegister,
}
Expand Down
74 changes: 9 additions & 65 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ impl SwarmDriver {
debug!("SwarmEvent has been ignored: {other:?}")
}
}
self.remove_outdated_connections();

self.log_handling(event_string.to_string(), start.elapsed());

Expand All @@ -627,9 +626,12 @@ impl SwarmDriver {
fn remove_bootstrap_from_full(&mut self, peer_id: PeerId) {
let mut shall_removed = None;

let mut bucket_index = Some(0);

if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
if kbucket.num_entries() >= K_VALUE.into() {
if let Some(peers) = self.bootstrap_peers.get(&kbucket.range().0.ilog2()) {
bucket_index = kbucket.range().0.ilog2();
if let Some(peers) = self.bootstrap_peers.get(&bucket_index) {
for peer_entry in kbucket.iter() {
if peers.contains(peer_entry.node.key.preimage()) {
shall_removed = Some(*peer_entry.node.key.preimage());
Expand All @@ -649,72 +651,14 @@ impl SwarmDriver {
if let Some(removed_peer) = entry {
self.update_on_peer_removal(*removed_peer.node.key.preimage());
}
}
}

// Remove outdated connection to a peer if it is not in the RT.
// Optionally force remove all the connections for a provided peer.
fn remove_outdated_connections(&mut self) {
// To avoid this being called too frequenctly, only carry out prunning intervally.
if Instant::now() < self.last_connection_pruning_time + Duration::from_secs(30) {
return;
}
self.last_connection_pruning_time = Instant::now();

let mut removed_conns = 0;
self.live_connected_peers.retain(|connection_id, (peer_id, _addr, timeout_time)| {

// skip if timeout isn't reached yet
if Instant::now() < *timeout_time {
return true; // retain peer
// With the switch to using bootstrap cache, workload is distributed already.
// to avoid peers keeps being replaced by each other,
// there shall be just one time of removal to be undertaken.
if let Some(peers) = self.bootstrap_peers.get_mut(&bucket_index) {
let _ = peers.remove(&peer_id);
}

// ignore if peer is present in our RT
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(*peer_id) {
if kbucket
.iter()
.any(|peer_entry| *peer_id == *peer_entry.node.key.preimage())
{
return true; // retain peer
}
}

// skip if the peer is a relay server that we're connected to
if let Some(relay_manager) = self.relay_manager.as_ref() {
if relay_manager.keep_alive_peer(peer_id) {
return true; // retain peer
}
}

// skip if the peer is a node that is being relayed through us
if self.connected_relay_clients.contains(peer_id) {
return true; // retain peer
}

// actually remove connection
let result = self.swarm.close_connection(*connection_id);
debug!("Removed outdated connection {connection_id:?} to {peer_id:?} with result: {result:?}");

removed_conns += 1;

// do not retain this connection as it has been closed
false
});

if removed_conns == 0 {
return;
}

self.record_connection_metrics();

debug!(
"Current libp2p peers pool stats is {:?}",
self.swarm.network_info()
);
debug!(
"Removed {removed_conns} outdated live connections, still have {} left.",
self.live_connected_peers.len()
);
}

/// Record the metrics on update of connection state.
Expand Down
6 changes: 0 additions & 6 deletions ant-networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ impl RelayManager {
}
}

/// Should we keep this peer alive? Closing a connection to that peer would remove that server from the listen addr.
pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId) -> bool {
self.connected_relays.contains_key(peer_id)
|| self.waiting_for_reservation.contains_key(peer_id)
}

/// Add a potential candidate to the list if it satisfies all the identify checks and also supports the relay server
/// protocol.
pub(crate) fn add_potential_candidates(
Expand Down
Loading