Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Introduce p2p idle timeout #4728

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use crate::{
unbounded_with_len, Broadcast, Error, NetworkMessage, OnlinePeers, Post, UpdateTopology,
};

/// Timeout after which disconnect from idle side
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
Erigara marked this conversation as resolved.
Show resolved Hide resolved

/// [`NetworkBase`] actor handle.
// NOTE: channels are unbounded in order to break communication cycle deadlock.
// Unbounded channels are ok here because messages frequency is either configurable (and relatively low)
Expand Down Expand Up @@ -94,6 +97,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout: IDLE_TIMEOUT,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -191,6 +195,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -277,6 +283,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -340,6 +347,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -365,6 +373,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -394,7 +404,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key.clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
78 changes: 68 additions & 10 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
TcpStream,
},
sync::{mpsc, oneshot},
time::Duration,
};

use crate::{boilerplate::*, CryptographicError, Error};
Expand All @@ -38,6 +39,7 @@ pub mod handles {
key_pair: KeyPair,
connection_id: ConnectionId,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::Connecting {
peer_addr,
Expand All @@ -47,6 +49,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -57,6 +60,7 @@ pub mod handles {
key_pair: KeyPair,
connection: Connection,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::ConnectedFrom {
peer_addr,
Expand All @@ -66,6 +70,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -92,6 +97,8 @@ mod run {
//! Module with peer [`run`] function.

use iroha_logger::prelude::*;
use parity_scale_codec::Decode;
use tokio::time::Instant;

use super::{
cryptographer::Cryptographer,
Expand All @@ -108,6 +115,7 @@ mod run {
RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
}: RunPeerArgs<T, P>,
) {
let conn_id = peer.connection_id();
Expand All @@ -118,11 +126,15 @@ mod run {
// Insure proper termination from every execution path.
async {
// Try to do handshake process
let peer = match peer.handshake().await {
Ok(ready) => ready,
Err(error) => {
let peer = match tokio::time::timeout(idle_timeout, peer.handshake()).await {
Ok(Ok(ready)) => ready,
Ok(Err(error)) => {
iroha_logger::error!(%error, "Failure during handshake.");
return;
},
Err(_) => {
iroha_logger::error!(timeout=?idle_timeout, "Other peer has been idle during handshake");
return;
}
};

Expand Down Expand Up @@ -175,8 +187,28 @@ mod run {
let mut message_reader = MessageReader::new(read, cryptographer.clone());
let mut message_sender = MessageSender::new(write, cryptographer);

let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_timeout, idle_timeout);
let mut ping_interval = tokio::time::interval_at(Instant::now() + idle_timeout / 2, idle_timeout / 2);

loop {
tokio::select! {
_ = ping_interval.tick() => {
iroha_logger::trace!(
ping_period=?ping_interval.period(),
"The connection has been idle, pinging to check if it's alive"
);
if let Err(error) = message_sender.send_message(Message::<T>::Ping).await {
iroha_logger::error!(%error, "Failed to send ping to peer.");
break;
}
}
_ = idle_interval.tick() => {
iroha_logger::error!(
timeout=?idle_interval.period(),
"Didn't receive anything from the peer within given timeout, abandoning this connection"
);
break;
}
msg = post_receiver.recv() => {
let Some(msg) = msg else {
iroha_logger::debug!("Peer handle dropped.");
Expand All @@ -187,7 +219,7 @@ mod run {
if post_receiver_len > 100 {
iroha_logger::warn!(size=post_receiver_len, "Peer post messages are pilling up");
}
if let Err(error) = message_sender.send_message(msg).await {
if let Err(error) = message_sender.send_message(Message::Data(msg)).await {
iroha_logger::error!(%error, "Failed to send message to peer.");
break;
}
Expand All @@ -206,12 +238,29 @@ mod run {
break;
}
};
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
match msg {
Message::Ping => {
iroha_logger::trace!("Received peer ping");
if let Err(error) = message_sender.send_message(Message::<T>::Pong).await {
iroha_logger::error!(%error, "Failed to send message to peer.");
break;
}
},
Message::Pong => {
iroha_logger::trace!("Received peer pong");
}
Message::Data(msg) => {
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
}
};
// Reset idle and ping timeout as peer received message from another peer
idle_interval.reset();
ping_interval.reset();
}
else => break,
}
Expand All @@ -229,6 +278,7 @@ mod run {
pub(super) struct RunPeerArgs<T: Pload, P> {
pub peer: P,
pub service_message_sender: mpsc::Sender<ServiceMessage<T>>,
pub idle_timeout: Duration,
}

/// Trait for peer stages that might be used as starting point for peer's [`run`] function.
Expand Down Expand Up @@ -365,6 +415,14 @@ mod run {
Ok(())
}
}

/// Either message or ping
#[derive(Encode, Decode, Clone, Debug)]
enum Message<T> {
Data(T),
Ping,
Pong,
}
}

mod state {
Expand Down
Loading