Skip to content

Commit

Permalink
fix(network): if all peers are blocked, act as if there are no peers
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Jul 23, 2024
1 parent 6403530 commit f31d1dc
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 26 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/papyrus_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ path = "src/bin/streamed_bytes_benchmark.rs"
[dependencies]
async-stream.workspace = true
bytes.workspace = true
chrono.workspace = true
defaultmap.workspace = true
derive_more.workspace = true
futures.workspace = true
Expand Down
14 changes: 12 additions & 2 deletions crates/papyrus_network/src/peer_manager/behaviour_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::task::Poll;
use std::task::{ready, Poll};

use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{
Expand Down Expand Up @@ -175,9 +175,19 @@ where

fn poll(
&mut self,
_cx: &mut std::task::Context<'_>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>>
{
if let Some(event) = self.pending_events.pop() {
return Poll::Ready(event);
}
if let Some(sleep_future) = &mut self.sleep_waiting_for_unblocked_peer {
ready!(sleep_future.as_mut().poll(cx));
for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) {
self.assign_peer_to_session(outbound_session_id);
}
}
self.sleep_waiting_for_unblocked_peer = None;
self.pending_events.pop().map(Poll::Ready).unwrap_or(Poll::Pending)
}
}
34 changes: 32 additions & 2 deletions crates/papyrus_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::time::Duration;

use chrono::Duration;
use futures::future::BoxFuture;
use futures::FutureExt;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::ToSwarm;
use libp2p::PeerId;
Expand Down Expand Up @@ -30,9 +32,11 @@ pub struct PeerManager<P: PeerTrait + 'static> {
session_to_peer_map: HashMap<OutboundSessionId, PeerId>,
config: PeerManagerConfig,
last_peer_index: usize,
// TODO(shahak): Change to VecDeque and awake when item is added.
pending_events: Vec<ToSwarm<ToOtherBehaviourEvent, libp2p::swarm::THandlerInEvent<Self>>>,
peers_pending_dial_with_sessions: HashMap<PeerId, Vec<OutboundSessionId>>,
sessions_received_when_no_peers: Vec<OutboundSessionId>,
sleep_waiting_for_unblocked_peer: Option<BoxFuture<'static, ()>>,
}

#[derive(Clone)]
Expand All @@ -53,7 +57,11 @@ pub(crate) enum PeerManagerError {

impl Default for PeerManagerConfig {
fn default() -> Self {
Self { target_num_for_peers: 100, blacklist_timeout: Duration::max_value() }
Self {
target_num_for_peers: 100,
// 1 year.
blacklist_timeout: Duration::from_secs(3600 * 24 * 365),
}
}
}

Expand All @@ -72,13 +80,16 @@ where
pending_events: Vec::new(),
peers_pending_dial_with_sessions: HashMap::new(),
sessions_received_when_no_peers: Vec::new(),
sleep_waiting_for_unblocked_peer: None,
}
}

fn add_peer(&mut self, mut peer: P) {
info!("Peer Manager found new peer {:?}", peer.peer_id());
peer.set_timeout_duration(self.config.blacklist_timeout);
self.peers.insert(peer.peer_id(), peer);
// The new peer is unblocked so we don't need to wait for unblocked peer.
self.sleep_waiting_for_unblocked_peer = None;
for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) {
self.assign_peer_to_session(outbound_session_id);
}
Expand All @@ -90,10 +101,12 @@ where
}

// TODO(shahak): Remove return value and use events in tests.
// TODO(shahak): Split this function for readability.
fn assign_peer_to_session(&mut self, outbound_session_id: OutboundSessionId) -> Option<PeerId> {
// TODO: consider moving this logic to be async (on a different tokio task)
// until then we can return the assignment even if we use events for the notification.
if self.peers.is_empty() {
info!("No peers. Waiting for a new peer to be connected for {outbound_session_id:?}");
self.sessions_received_when_no_peers.push(outbound_session_id);
return None;
}
Expand All @@ -106,6 +119,23 @@ where
self.peers.iter().take(self.last_peer_index).find(|(_, peer)| !peer.is_blocked())
});
self.last_peer_index = (self.last_peer_index + 1) % self.peers.len();
if peer.is_none() {
info!(
"No unblocked peers. Waiting for a new peer to be connected or for a peer to \
become unblocked for {outbound_session_id:?}"
);
self.sessions_received_when_no_peers.push(outbound_session_id);
// Find the peer closest to becoming unblocked.
let sleep_deadline = self
.peers
.values()
.map(|peer| peer.blocked_until())
.min()
.expect("min should not return None on a non-empty iterator");
self.sleep_waiting_for_unblocked_peer =
Some(tokio::time::sleep_until(sleep_deadline.into()).boxed());
return None;
}
peer.map(|(peer_id, peer)| {
// TODO: consider not allowing reassignment of the same session
self.session_to_peer_map.insert(outbound_session_id, *peer_id);
Expand Down
23 changes: 14 additions & 9 deletions crates/papyrus_network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// using chrono time and not std since std does not have the ability for std::time::Instance to
// represent the maximum time of the system.
use chrono::{DateTime, Duration, Utc};
use std::time::{Duration, Instant};

use libp2p::swarm::ConnectionId;
use libp2p::{Multiaddr, PeerId};
#[cfg(test)]
Expand All @@ -23,6 +22,9 @@ pub trait PeerTrait {

fn is_blocked(&self) -> bool;

/// Returns Instant::now if not blocked.
fn blocked_until(&self) -> Instant;

fn connection_ids(&self) -> &Vec<ConnectionId>;

fn add_connection_id(&mut self, connection_id: ConnectionId);
Expand All @@ -34,7 +36,7 @@ pub trait PeerTrait {
pub struct Peer {
peer_id: PeerId,
multiaddr: Multiaddr,
timed_out_until: Option<DateTime<Utc>>,
timed_out_until: Option<Instant>,
timeout_duration: Option<Duration>,
connection_ids: Vec<ConnectionId>,
}
Expand All @@ -52,11 +54,10 @@ impl PeerTrait for Peer {

fn update_reputation(&mut self, _reason: ReputationModifier) {
if let Some(timeout_duration) = self.timeout_duration {
self.timed_out_until =
Utc::now().checked_add_signed(timeout_duration).or(Some(DateTime::<Utc>::MAX_UTC));
return;
self.timed_out_until = Some(Instant::now() + timeout_duration);
} else {
debug!("Timeout duration not set for peer: {:?}", self.peer_id);
}
debug!("Timeout duration not set for peer: {:?}", self.peer_id);
}

fn peer_id(&self) -> PeerId {
Expand All @@ -73,12 +74,16 @@ impl PeerTrait for Peer {

fn is_blocked(&self) -> bool {
if let Some(timed_out_until) = self.timed_out_until {
timed_out_until > Utc::now()
timed_out_until > Instant::now()
} else {
false
}
}

fn blocked_until(&self) -> Instant {
self.timed_out_until.unwrap_or_else(Instant::now)
}

fn connection_ids(&self) -> &Vec<ConnectionId> {
&self.connection_ids
}
Expand Down
87 changes: 76 additions & 11 deletions crates/papyrus_network/src/peer_manager/test.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// TODO(shahak): Add tests for multiple connection ids

use core::{panic, time};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use assert_matches::assert_matches;
use chrono::Duration;
use futures::future::poll_fn;
use futures::{FutureExt, Stream, StreamExt};
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionId, NetworkBehaviour, ToSwarm};
use libp2p::{Multiaddr, PeerId};
use mockall::predicate::eq;
use tokio::time::sleep;
use void::Void;

use super::behaviour_impl::ToOtherBehaviourEvent;
use crate::discovery::identify_impl::IdentifyToOtherBehaviourEvent;
Expand All @@ -19,6 +23,19 @@ use crate::peer_manager::peer::{MockPeerTrait, Peer, PeerTrait};
use crate::peer_manager::{PeerManager, PeerManagerConfig, ReputationModifier};
use crate::sqmr::OutboundSessionId;

impl<P: PeerTrait> Unpin for PeerManager<P> {}

impl<P: PeerTrait> Stream for PeerManager<P> {
type Item = ToSwarm<ToOtherBehaviourEvent, Void>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::into_inner(self).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(Some(event)),
}
}
}

#[test]
fn peer_assignment_round_robin() {
// Create a new peer manager
Expand Down Expand Up @@ -102,8 +119,8 @@ fn peer_assignment_round_robin() {
}
}

#[test]
fn peer_assignment_no_peers() {
#[tokio::test]
async fn peer_assignment_no_peers() {
// Create a new peer manager
let config = PeerManagerConfig::default();
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());
Expand All @@ -113,25 +130,72 @@ fn peer_assignment_no_peers() {

// Assign a peer to the session
assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None);
assert!(peer_manager.next().now_or_never().is_none());

// Now the peer manager finds a new peer and can assign the session.
let connection_id = ConnectionId::new_unchecked(0);
let (mut peer, peer_id) =
create_mock_peer(config.blacklist_timeout, false, Some(connection_id));
peer.expect_is_blocked().times(1).return_const(false);
peer_manager.add_peer(peer);
assert_eq!(peer_manager.pending_events.len(), 1);
assert_matches!(
peer_manager.pending_events.first().unwrap(),
peer_manager.next().await.unwrap(),
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned {
outbound_session_id: event_outbound_session_id,
peer_id: event_peer_id,
connection_id: event_connection_id,
}
) if outbound_session_id == *event_outbound_session_id &&
peer_id == *event_peer_id &&
connection_id == *event_connection_id
) if outbound_session_id == event_outbound_session_id &&
peer_id == event_peer_id &&
connection_id == event_connection_id
);
assert!(peer_manager.next().now_or_never().is_none());
}

#[tokio::test]
async fn peer_assignment_no_unblocked_peers() {
const BLOCKED_UNTIL: Duration = Duration::from_secs(5);
const TIMEOUT: Duration = Duration::from_secs(1);
// Create a new peer manager
let config = PeerManagerConfig::default();
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());

// Create a session
let outbound_session_id = OutboundSessionId { value: 1 };

// Create a peer
let connection_id = ConnectionId::new_unchecked(0);
let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, Some(connection_id));
peer.expect_is_blocked().times(1).return_const(true);
peer.expect_is_blocked().times(1).return_const(false);
peer.expect_blocked_until().times(1).returning(|| Instant::now() + BLOCKED_UNTIL);

peer_manager.add_peer(peer);
peer_manager.report_peer(peer_id, ReputationModifier::Bad {}).unwrap();

// Try to assign a peer to the session, and check there wasn't any assignment.
assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None);
assert!(peer_manager.next().now_or_never().is_none());

// Simulate that BLOCKED_UNTIL has passed.
tokio::time::pause();
tokio::time::advance(BLOCKED_UNTIL).await;
tokio::time::resume();

// After BLOCKED_UNTIL has passed, the peer manager can assign the session.
let event = tokio::time::timeout(TIMEOUT, peer_manager.next()).await.unwrap().unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned {
outbound_session_id: event_outbound_session_id,
peer_id: event_peer_id,
connection_id: event_connection_id,
}
) if outbound_session_id == event_outbound_session_id &&
peer_id == event_peer_id &&
connection_id == event_connection_id
);
assert!(peer_manager.next().now_or_never().is_none());
}

#[test]
Expand All @@ -155,7 +219,7 @@ fn report_peer_calls_update_reputation() {
async fn peer_block_realeased_after_timeout() {
const DURATION_IN_MILLIS: u64 = 50;
let mut peer = Peer::new(PeerId::random(), Multiaddr::empty());
peer.set_timeout_duration(Duration::milliseconds(DURATION_IN_MILLIS as i64));
peer.set_timeout_duration(Duration::from_millis(DURATION_IN_MILLIS));
peer.update_reputation(ReputationModifier::Bad {});
assert!(peer.is_blocked());
sleep(time::Duration::from_millis(DURATION_IN_MILLIS)).await;
Expand Down Expand Up @@ -236,15 +300,16 @@ fn more_peers_needed() {
assert!(!peer_manager.more_peers_needed());
}

#[test]
fn timed_out_peer_not_assignable_to_queries() {
#[tokio::test]
async fn timed_out_peer_not_assignable_to_queries() {
// Create a new peer manager
let config = PeerManagerConfig::default();
let mut peer_manager: PeerManager<MockPeerTrait> = PeerManager::new(config.clone());

// Create a mock peer
let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, None);
peer.expect_is_blocked().times(1).return_const(true);
peer.expect_blocked_until().times(1).returning(|| Instant::now() + Duration::from_secs(1));

// Add the mock peer to the peer manager
peer_manager.add_peer(peer);
Expand Down

0 comments on commit f31d1dc

Please sign in to comment.