Skip to content

Commit

Permalink
fix(network): do not log connection error if the peer has multiple tr…
Browse files Browse the repository at this point in the history
…ansports
  • Loading branch information
RolandSherwin committed Nov 12, 2024
1 parent 9e46d4c commit 22efa29
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 30 deletions.
16 changes: 9 additions & 7 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use std::{
fmt::Debug,
fs,
io::{Read, Write},
net::SocketAddr,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
path::PathBuf,
};
Expand Down Expand Up @@ -721,7 +721,7 @@ impl NetworkBuilder {
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
latest_connected_peers: Default::default(),
latest_established_connection_ids: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
hard_disk_write_error: 0,
Expand Down Expand Up @@ -820,10 +820,9 @@ pub struct SwarmDriver {
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
/// The peers that we recently connected to.
/// This is a limited list used to prevent log spamming.
/// Use `live_connected_peers` for a full list.
pub(crate) latest_connected_peers: HashMap<Multiaddr, Instant>,
/// The list of recently established connections ids.
/// This is used to prevent log spamming.
pub(crate) latest_established_connection_ids: HashMap<usize, (IpAddr, Instant)>,
// Record the handling time of the recent 10 for each handling kind.
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
Expand Down Expand Up @@ -885,13 +884,16 @@ impl SwarmDriver {
},
// next take and react to external swarm events
swarm_event = self.swarm.select_next_some() => {

// Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
// processing the event for one round.
if let Some(previous_event) = previous_incoming_connection_error_event.take() {
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
if let Err(err) = self.handle_swarm_events(previous_event) {
warn!("Error while handling swarm event: {err}");
}
continue;
}
if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
previous_incoming_connection_error_event = Some(swarm_event);
Expand Down
82 changes: 59 additions & 23 deletions sn_networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, relay_manager::is_a_relayed_peer,
target_arch::Instant, NetworkEvent, Result, SwarmDriver,
event::NodeEvent, multiaddr_get_ip, multiaddr_is_global, multiaddr_strip_p2p,
relay_manager::is_a_relayed_peer, target_arch::Instant, NetworkEvent, Result, SwarmDriver,
};
#[cfg(feature = "local")]
use libp2p::mdns;
Expand All @@ -19,7 +19,7 @@ use libp2p::{
multiaddr::Protocol,
swarm::{
dial_opts::{DialOpts, PeerCondition},
DialError, SwarmEvent,
ConnectionId, DialError, SwarmEvent,
},
Multiaddr, PeerId, TransportError,
};
Expand Down Expand Up @@ -364,7 +364,10 @@ impl SwarmDriver {
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
);
self.insert_latest_connected_peers(endpoint.get_remote_address().clone());
self.insert_latest_established_connection_ids(
connection_id,
endpoint.get_remote_address(),
);
self.record_connection_metrics();

if endpoint.is_dialer() {
Expand All @@ -381,9 +384,6 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);
let _ = self
.latest_connected_peers
.remove(endpoint.get_remote_address());
self.record_connection_metrics();
}
SwarmEvent::OutgoingConnectionError {
Expand Down Expand Up @@ -513,7 +513,9 @@ impl SwarmDriver {
error,
} => {
event_string = "Incoming ConnErr";
// Only log if this for a non-existing connection.
// Only log as ERROR if the the connection is not adjacent to an already established connection id from
// the same IP address.
//
// If a peer contains multiple transports/listen addrs, we might try to open multiple connections,
// and if the first one passes, we would get error on the rest. We don't want to log these.
//
Expand All @@ -522,8 +524,10 @@ impl SwarmDriver {
// giving time for ConnectionEstablished to be hopefully processed.
// And since we don't do anything critical with this event, the order and time of processing is
// not critical.
if !self.latest_connected_peers.contains_key(&send_back_addr) {
if self.should_we_log_incoming_connection_error(connection_id, &send_back_addr) {
error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
} else {
debug!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
}
let _ = self.live_connected_peers.remove(&connection_id);
self.record_connection_metrics();
Expand Down Expand Up @@ -675,29 +679,61 @@ impl SwarmDriver {
}
}

/// Insert into the latest connected peers list. This list does not contain all the connected peers.
/// Older entries are removed if the list exceeds 50.
fn insert_latest_connected_peers(&mut self, addr: Multiaddr) {
let old_instant = self
.latest_connected_peers
.entry(addr)
.or_insert_with(Instant::now);
*old_instant = Instant::now();
/// Insert the latest established connection id into the list.
fn insert_latest_established_connection_ids(&mut self, id: ConnectionId, addr: &Multiaddr) {
let Ok(id) = format!("{id}").parse::<usize>() else {
return;
};
let Some(ip_addr) = multiaddr_get_ip(addr) else {
return;
};

let _ = self
.latest_established_connection_ids
.insert(id, (ip_addr, Instant::now()));

while self.latest_connected_peers.len() >= 50 {
while self.latest_established_connection_ids.len() >= 50 {
// remove the oldest entry
let Some(oldest) = self
.latest_connected_peers
let Some(oldest_key) = self
.latest_established_connection_ids
.iter()
.min_by_key(|(_, time)| *time)
.map(|(addr, _)| addr.clone())
.min_by_key(|(_, (_, time))| *time)
.map(|(id, _)| *id)
else {
break;
};

self.latest_connected_peers.remove(&oldest);
self.latest_established_connection_ids.remove(&oldest_key);
}
}

// Do not log IncomingConnectionError if the ConnectionId is adjacent to an already established connection.
fn should_we_log_incoming_connection_error(&self, id: ConnectionId, addr: &Multiaddr) -> bool {
let Ok(id) = format!("{id}").parse::<usize>() else {
return true;
};
let Some(ip_addr) = multiaddr_get_ip(addr) else {
return true;
};

// This should prevent most of the cases where we get an IncomingConnectionError for a peer with multiple
// transports/listen addrs.
if let Some((established_ip_addr, _)) =
self.latest_established_connection_ids.get(&(id - 1))
{
if established_ip_addr == &ip_addr {
return false;
}
} else if let Some((established_ip_addr, _)) =
self.latest_established_connection_ids.get(&(id + 1))
{
if established_ip_addr == &ip_addr {
return false;
}
}

true
}
}

/// Helper function to print formatted connection role info.
Expand Down

0 comments on commit 22efa29

Please sign in to comment.