Skip to content

Commit

Permalink
Fix some network issue poionsed peer (#3216)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanlee42 authored Feb 17, 2022
1 parent 274f8dd commit 03b6024
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 35 deletions.
7 changes: 5 additions & 2 deletions network-p2p/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub use libp2p::PeerId;

/// We don't accept nodes whose reputation is under this value.
pub const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
/// unbanned peer after seconds
pub const UNBANNED_AFTER: Duration = Duration::from_secs(60 * 5);
/// Reputation change for a node when we get disconnected from it.
const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
/// Amount of time between the moment we disconnect from a node and the moment we remove it from
Expand Down Expand Up @@ -222,7 +224,7 @@ pub enum Message {
/// Equivalent to `Drop` for the peer corresponding to this incoming index.
Reject(IncomingIndex),

Banned(PeerId),
Banned(PeerId, Duration),
}

/// Opaque identifier for an incoming connection. Allocated by the network.
Expand Down Expand Up @@ -531,7 +533,8 @@ impl Peerset {
if peer.last_connected_or_discovered() + FORGET_AFTER < now {
peer.forget_peer();
if after < BANNED_THRESHOLD {
self.message_queue.push_back(Message::Banned(peer_id))
self.message_queue
.push_back(Message::Banned(peer_id, UNBANNED_AFTER))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion network-p2p/peerset/tests/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn test_once() {
}
Poll::Ready(None) => panic!(),
Poll::Pending => {}
Poll::Ready(Some(Message::Banned(_))) => {}
Poll::Ready(Some(Message::Banned(..))) => {}
},

// If we generate 1, discover a new node.
Expand Down
8 changes: 4 additions & 4 deletions network-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub enum BehaviourOut {
/// request duration.
Dht(DhtEvent, Duration),

BannedRequest(PeerId),
BannedRequest(PeerId, Duration),
}

impl Behaviour {
Expand Down Expand Up @@ -288,9 +288,9 @@ impl NetworkBehaviourEventProcess<CustomMessageOutcome> for Behaviour {
notifications_sink,
});
}
CustomMessageOutcome::Banned(peer_id) => {
self.events.push_back(BehaviourOut::BannedRequest(peer_id))
}
CustomMessageOutcome::Banned(peer_id, duration) => self
.events
.push_back(BehaviourOut::BannedRequest(peer_id, duration)),
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions network-p2p/src/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,14 @@ impl NetworkBehaviour for PeerInfoBehaviour {

if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: "sub-libp2p",
"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.ping.inject_disconnected(peer_id);
self.identify.inject_disconnected(peer_id);

if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
} else {
error!(target: "sub-libp2p",
"Disconnected from node we were not connected to {:?}", peer_id);
}
}

Expand Down
7 changes: 5 additions & 2 deletions network-p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::str;
use std::sync::Arc;
use std::task::Poll;
use std::time;
use std::time::Duration;

//const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
Expand Down Expand Up @@ -84,7 +85,7 @@ pub enum CustomMessageOutcome {
messages: Vec<(Cow<'static, str>, Bytes)>,
},
None,
Banned(PeerId),
Banned(PeerId, Duration),
}

/// Peer information
Expand Down Expand Up @@ -314,7 +315,9 @@ impl NetworkBehaviour for Protocol {
let protocol_name = self.notif_protocols[usize::from(set_id)].clone();
self.on_notify(peer_id, vec![(protocol_name, message.freeze())])
}
GenericProtoOut::Banned(peer_id) => CustomMessageOutcome::Banned(peer_id),
GenericProtoOut::Banned(peer_id, duration) => {
CustomMessageOutcome::Banned(peer_id, duration)
}
};

if !matches!(outcome, CustomMessageOutcome::None) {
Expand Down
67 changes: 50 additions & 17 deletions network-p2p/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ pub enum GenericProtoOut {
/// Message that has been received.
message: BytesMut,
},
Banned(PeerId),
Banned(PeerId, Duration),
}

impl GenericProto {
Expand Down Expand Up @@ -433,19 +433,18 @@ impl GenericProto {
set_id: sc_peerset::SetId,
ban: Option<Duration>,
) {
let handler = self.new_handler();
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
entry
} else {
return;
};

match mem::replace(entry.get_mut(), PeerState::Poisoned) {
// We're not connected anyway.
st @ PeerState::Disabled { .. } => *entry.into_mut() = st,
st @ PeerState::Requested => *entry.into_mut() = st,
st @ PeerState::PendingRequest { .. } => *entry.into_mut() = st,
st @ PeerState::Backoff { .. } => *entry.into_mut() = st,

// DisabledPendingEnable => Disabled.
PeerState::DisabledPendingEnable {
connections,
Expand Down Expand Up @@ -586,7 +585,14 @@ impl GenericProto {
}

PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id)
warn!(target: "sub-libp2p", "State of {:?} is poisoned reconnected", entry.key());
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(entry.key().0)
.condition(PeerCondition::Disconnected)
.build(),
handler,
});
*entry.into_mut() = PeerState::Requested;
}
}
}
Expand Down Expand Up @@ -867,14 +873,21 @@ impl GenericProto {
}

PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", occ_entry.key());
debug_assert!(false);
warn!(target: "sub-libp2p", "State of {:?} is poisoned reconnected", occ_entry.key());
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(occ_entry.key().0)
.condition(PeerCondition::Disconnected)
.build(),
handler,
});
*occ_entry.into_mut() = PeerState::Requested;
}
}
}

/// Function that is called when the peerset wants us to disconnect from a peer.
fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: sc_peerset::SetId) {
let handler = self.new_handler();
let mut entry = match self.peers.entry((peer_id, set_id)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
Expand Down Expand Up @@ -1001,8 +1014,14 @@ impl GenericProto {
debug_assert!(false);
}
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", entry.key());
debug_assert!(false);
warn!(target: "sub-libp2p", "State of {:?} is poisoned reconnected", entry.key());
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(entry.key().0)
.condition(PeerCondition::Disconnected)
.build(),
handler,
});
*entry.into_mut() = PeerState::Requested;
}
}
}
Expand Down Expand Up @@ -1241,6 +1260,7 @@ impl NetworkBehaviour for GenericProto {
_endpoint: &ConnectedPoint,
_handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
let handler = self.new_handler();
for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) {
entry
Expand Down Expand Up @@ -1527,8 +1547,14 @@ impl NetworkBehaviour for GenericProto {
debug_assert!(false);
}
PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id);
debug_assert!(false);
warn!(target: "sub-libp2p", "State of {:?} is poisoned reconnected", entry.key());
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(entry.key().0)
.condition(PeerCondition::Disconnected)
.build(),
handler: handler.clone(),
});
*entry.into_mut() = PeerState::Requested;
}
}
}
Expand All @@ -1547,10 +1573,9 @@ impl NetworkBehaviour for GenericProto {
trace!(target: "sub-libp2p", "Libp2p => Reach failure for {:?} through {:?}: {:?}", peer_id, addr, error);
}
}

let handler = self.new_handler();
if let Some(peer_id) = peer_id {
trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id);

for set_id in (0..self.notif_protocols.len()).map(sc_peerset::SetId::from) {
if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) {
match mem::replace(entry.get_mut(), PeerState::Poisoned) {
Expand Down Expand Up @@ -1602,8 +1627,14 @@ impl NetworkBehaviour for GenericProto {
}

PeerState::Poisoned => {
error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id);
debug_assert!(false);
warn!(target: "sub-libp2p", "State of {:?} is poisoned reconnected", entry.key());
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(entry.key().0)
.condition(PeerCondition::Disconnected)
.build(),
handler: handler.clone(),
});
*entry.into_mut() = PeerState::Requested;
}
}
}
Expand Down Expand Up @@ -2197,9 +2228,11 @@ impl NetworkBehaviour for GenericProto {
})) => {
self.peerset_report_disconnect(peer_id, set_id);
}
Poll::Ready(Some(sc_peerset::Message::Banned(peer_id))) => self.events.push_back(
NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Banned(peer_id)),
),
Poll::Ready(Some(sc_peerset::Message::Banned(peer_id, duration))) => {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GenericProtoOut::Banned(peer_id, duration),
))
}
Poll::Ready(None) => {
error!(target: "sub-libp2p", "Peerset receiver stream has returned None");
break;
Expand Down
1 change: 1 addition & 0 deletions network-p2p/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// it is turned into a [`NotifsHandler`].
///
/// See the documentation at the module level for more information.
#[derive(Clone)]
pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
Expand Down
24 changes: 22 additions & 2 deletions network-p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl NetworkWorker {
from_worker,
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
metrics,
unbans: Default::default(),
boot_node_ids,
peers_notifications_sinks,
})
Expand Down Expand Up @@ -1010,6 +1011,7 @@ pub struct NetworkWorker {
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
metrics: Option<Metrics>,
unbans: stream::FuturesUnordered<Pin<Box<dyn Future<Output = PeerId> + Send>>>,
/// The `PeerId`'s of all boot nodes.
boot_node_ids: Arc<HashSet<PeerId>>,
/// For each peer, an object that allows sending notifications to
Expand Down Expand Up @@ -1115,8 +1117,24 @@ impl Future for NetworkWorker {

match poll_value {
Poll::Pending => break,
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BannedRequest(peer_id))) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BannedRequest(
peer_id,
duration,
))) => {
info!(
"network banned peer {} for {} secs",
peer_id,
duration.as_secs()
);
this.network_service.ban_peer_id(peer_id);
this.unbans.push(
async move {
let delay = futures_timer::Delay::new(duration);
delay.await;
peer_id
}
.boxed(),
);
}
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest {
protocol,
Expand Down Expand Up @@ -1526,7 +1544,9 @@ impl Future for NetworkWorker {
.set(node_count as u64)
}
}

while let Poll::Ready(Some(peer_id)) = Pin::new(&mut this.unbans).poll_next(cx) {
this.network_service.unban_peer_id(peer_id);
}
Poll::Pending
}
}
Expand Down

0 comments on commit 03b6024

Please sign in to comment.