Skip to content

Commit

Permalink
fix: Introduce p2p idle timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Jun 13, 2024
1 parent e89896a commit 901395b
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 11 deletions.
11 changes: 10 additions & 1 deletion p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use crate::{
unbounded_with_len, Broadcast, Error, NetworkMessage, OnlinePeers, Post, UpdateTopology,
};

/// Timeout after which disconnect from idle side
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);

/// [`NetworkBase`] actor handle.
// NOTE: channels are unbounded in order to break communication cycle deadlock.
// Unbounded channels are ok here because messages frequency is either configurable (and relatively low)
Expand Down Expand Up @@ -94,6 +97,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout: IDLE_TIMEOUT,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -191,6 +195,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -277,6 +283,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -340,6 +347,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -365,6 +373,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -394,7 +404,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key.clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
78 changes: 68 additions & 10 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
TcpStream,
},
sync::{mpsc, oneshot},
time::Duration,
};

use crate::{boilerplate::*, CryptographicError, Error};
Expand All @@ -38,6 +39,7 @@ pub mod handles {
key_pair: KeyPair,
connection_id: ConnectionId,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::Connecting {
peer_addr,
Expand All @@ -47,6 +49,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -57,6 +60,7 @@ pub mod handles {
key_pair: KeyPair,
connection: Connection,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::ConnectedFrom {
peer_addr,
Expand All @@ -66,6 +70,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -92,6 +97,8 @@ mod run {
//! Module with peer [`run`] function.
use iroha_logger::prelude::*;
use parity_scale_codec::Decode;
use tokio::time::Instant;

use super::{
cryptographer::Cryptographer,
Expand All @@ -108,6 +115,7 @@ mod run {
RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
}: RunPeerArgs<T, P>,
) {
let conn_id = peer.connection_id();
Expand All @@ -118,11 +126,15 @@ mod run {
// Insure proper termination from every execution path.
async {
// Try to do handshake process
let peer = match peer.handshake().await {
Ok(ready) => ready,
Err(error) => {
let peer = match tokio::time::timeout(idle_timeout, peer.handshake()).await {
Ok(Ok(ready)) => ready,
Ok(Err(error)) => {
iroha_logger::error!(%error, "Failure during handshake.");
return;
},
Err(_) => {
iroha_logger::error!(timeout=?idle_timeout, "Other peer has been idle during handshake");
return;
}
};

Expand Down Expand Up @@ -175,8 +187,28 @@ mod run {
let mut message_reader = MessageReader::new(read, cryptographer.clone());
let mut message_sender = MessageSender::new(write, cryptographer);

let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_timeout, idle_timeout);
let mut ping_interval = tokio::time::interval_at(Instant::now() + idle_timeout / 2, idle_timeout / 2);

loop {
tokio::select! {
_ = ping_interval.tick() => {
iroha_logger::trace!(
ping_period=?ping_interval.period(),
"The connection has been idle, pinging to check if it's alive"
);
if let Err(error) = message_sender.send_message(Message::<T>::Ping).await {
iroha_logger::error!(%error, "Failed to send ping to peer.");
break;
}
}
_ = idle_interval.tick() => {
iroha_logger::error!(
timeout=?idle_interval.period(),
"Didn't receive anything from the peer within given timeout, abandoning this connection"
);
break;
}
msg = post_receiver.recv() => {
let Some(msg) = msg else {
iroha_logger::debug!("Peer handle dropped.");
Expand All @@ -187,7 +219,7 @@ mod run {
if post_receiver_len > 100 {
iroha_logger::warn!(size=post_receiver_len, "Peer post messages are pilling up");
}
if let Err(error) = message_sender.send_message(msg).await {
if let Err(error) = message_sender.send_message(Message::Data(msg)).await {
iroha_logger::error!(%error, "Failed to send message to peer.");
break;
}
Expand All @@ -206,12 +238,29 @@ mod run {
break;
}
};
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
match msg {
Message::Ping => {
iroha_logger::trace!("Received peer ping");
if let Err(error) = message_sender.send_message(Message::<T>::Pong).await {
iroha_logger::error!(%error, "Failed to send message to peer.");
break;
}
},
Message::Pong => {
iroha_logger::trace!("Received peer pong");
}
Message::Data(msg) => {
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
}
};
// Reset idle and ping timeout as peer received message from another peer
idle_interval.reset();
ping_interval.reset();
}
else => break,
}
Expand All @@ -229,6 +278,7 @@ mod run {
pub(super) struct RunPeerArgs<T: Pload, P> {
pub peer: P,
pub service_message_sender: mpsc::Sender<ServiceMessage<T>>,
pub idle_timeout: Duration,
}

/// Trait for peer stages that might be used as starting point for peer's [`run`] function.
Expand Down Expand Up @@ -365,6 +415,14 @@ mod run {
Ok(())
}
}

/// Either message or ping
#[derive(Encode, Decode, Clone, Debug)]
enum Message<T> {
Data(T),
Ping,
Pong,
}
}

mod state {
Expand Down

0 comments on commit 901395b

Please sign in to comment.