Skip to content

Commit

Permalink
[fix] #4267: Introduce p2p idle timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Mar 29, 2024
1 parent aa7a3bf commit 0bf8164
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 18 deletions.
1 change: 1 addition & 0 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ impl Iroha {
let network = IrohaNetwork::start(
config.common.p2p_address.clone(),
config.common.key_pair.clone(),
config.common.idle_timeout,
)
.await
.wrap_err("Unable to start P2P-network")?;
Expand Down
1 change: 1 addition & 0 deletions config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct Common {
pub chain_id: ChainId,
pub key_pair: KeyPair,
pub p2p_address: SocketAddr,
pub idle_timeout: Duration,
}

impl Common {
Expand Down
2 changes: 2 additions & 0 deletions config/src/parameters/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod network {

pub const DEFAULT_MAX_TRANSACTIONS_PER_GOSSIP: NonZeroU32 = nonzero!(500u32);
pub const DEFAULT_MAX_BLOCKS_PER_GOSSIP: NonZeroU32 = nonzero!(4u32);

pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
}

pub mod snapshot {
Expand Down
16 changes: 14 additions & 2 deletions config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Root {
}
}

let (p2p_address, block_sync, transaction_gossiper) = self.network.parse();
let (p2p_address, idle_timeout, block_sync, transaction_gossiper) = self.network.parse();

let logger = self.logger;
let queue = self.queue;
Expand Down Expand Up @@ -228,6 +228,7 @@ impl Root {
chain_id: self.chain_id,
key_pair: key_pair.unwrap(),
p2p_address,
idle_timeout,
};
let telemetry = telemetry.unwrap();
let genesis = genesis.unwrap();
Expand Down Expand Up @@ -457,20 +458,31 @@ pub struct Network {
pub block_gossip_period: Duration,
pub transaction_gossip_max_size: NonZeroU32,
pub transaction_gossip_period: Duration,
/// Duration of time after which connection with peer is terminated if peer is idle
pub idle_timeout: Duration,
}

impl Network {
fn parse(self) -> (SocketAddr, actual::BlockSync, actual::TransactionGossiper) {
fn parse(
self,
) -> (
SocketAddr,
Duration,
actual::BlockSync,
actual::TransactionGossiper,
) {
let Self {
address,
block_gossip_max_size,
block_gossip_period,
transaction_gossip_max_size,
transaction_gossip_period,
idle_timeout,
} = self;

(
address,
idle_timeout,
actual::BlockSync {
gossip_period: block_gossip_period,
gossip_max_size: block_gossip_max_size,
Expand Down
5 changes: 5 additions & 0 deletions config/src/parameters/user/boilerplate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct NetworkPartial {
pub block_gossip_period: UserField<HumanDuration>,
pub transaction_gossip_max_size: UserField<NonZeroU32>,
pub transaction_gossip_period: UserField<HumanDuration>,
pub idle_timeout: UserField<HumanDuration>,
}

impl UnwrapPartial for NetworkPartial {
Expand Down Expand Up @@ -445,6 +446,10 @@ impl UnwrapPartial for NetworkPartial {
.block_gossip_max_size
.get()
.unwrap_or(DEFAULT_MAX_BLOCKS_PER_GOSSIP),
idle_timeout: self
.idle_timeout
.map(HumanDuration::get)
.unwrap_or(DEFAULT_IDLE_TIMEOUT),
})
}
}
Expand Down
Binary file modified configs/swarm/executor.wasm
Binary file not shown.
14 changes: 12 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
/// # Errors
/// - If binding to address fail
#[log(skip(key_pair))]
pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result<Self, Error> {
pub async fn start(
listen_addr: SocketAddr,
key_pair: KeyPair,
idle_timeout: Duration,
) -> Result<Self, Error> {
let listener = TcpListener::bind(&listen_addr.to_string()).await?;
iroha_logger::info!("Network bound to listener");
let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new());
Expand Down Expand Up @@ -95,6 +99,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -192,6 +197,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -278,6 +285,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -341,6 +349,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -366,6 +375,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -395,7 +406,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key().clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
65 changes: 55 additions & 10 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
TcpStream,
},
sync::{mpsc, oneshot},
time::Duration,
};

use crate::{boilerplate::*, Error};
Expand All @@ -38,6 +39,7 @@ pub mod handles {
key_pair: KeyPair,
connection_id: ConnectionId,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::Connecting {
peer_addr,
Expand All @@ -47,6 +49,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -57,6 +60,7 @@ pub mod handles {
key_pair: KeyPair,
connection: Connection,
service_message_sender: mpsc::Sender<ServiceMessage<T>>,
idle_timeout: Duration,
) {
let peer = state::ConnectedFrom {
peer_addr,
Expand All @@ -66,6 +70,7 @@ pub mod handles {
let peer = RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
};
tokio::task::spawn(run::run::<T, K, E, _>(peer).in_current_span());
}
Expand All @@ -92,6 +97,8 @@ mod run {
//! Module with peer [`run`] function.
use iroha_logger::prelude::*;
use parity_scale_codec::Decode;
use tokio::time::Instant;

use super::{
cryptographer::Cryptographer,
Expand All @@ -108,6 +115,7 @@ mod run {
RunPeerArgs {
peer,
service_message_sender,
idle_timeout,
}: RunPeerArgs<T, P>,
) {
let conn_id = peer.connection_id();
Expand All @@ -118,11 +126,15 @@ mod run {
// Insure proper termination from every execution path.
async {
// Try to do handshake process
let peer = match peer.handshake().await {
Ok(ready) => ready,
Err(error) => {
let peer = match tokio::time::timeout(idle_timeout, peer.handshake()).await {
Ok(Ok(ready)) => ready,
Ok(Err(error)) => {
iroha_logger::error!(%error, "Failure during handshake.");
return;
},
Err(error) => {
iroha_logger::error!(%error, "Other peer is idle during handshake");
return;
}
};

Expand Down Expand Up @@ -175,8 +187,22 @@ mod run {
let mut message_reader = MessageReader::new(read, cryptographer.clone());
let mut message_sender = MessageSender::new(write, cryptographer);

let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_timeout, idle_timeout);
let mut ping_interval = tokio::time::interval_at(Instant::now() + idle_timeout / 2, idle_timeout / 2);

loop {
tokio::select! {
_ = ping_interval.tick() => {
iroha_logger::trace!("Sending ping");
if let Err(error) = message_sender.send_message(Message::<T>::Ping).await {
iroha_logger::error!(%error, "Failed to send ping to peer.");
break;
}
}
_ = idle_interval.tick() => {
iroha_logger::error!(timeout=idle_timeout.as_secs(), "Connection with peer is idle");
break;
}
msg = post_receiver.recv() => {
let Some(msg) = msg else {
iroha_logger::debug!("Peer handle dropped.");
Expand All @@ -187,10 +213,12 @@ mod run {
if post_receiver_len > 100 {
iroha_logger::warn!(size=post_receiver_len, "Peer post messages are pilling up");
}
if let Err(error) = message_sender.send_message(msg).await {
if let Err(error) = message_sender.send_message(Message::Message(msg)).await {
iroha_logger::error!(%error, "Failed to send message to peer.");
break;
}
// Reset ping timeout as peer send another message
ping_interval.reset();
}
msg = message_reader.read_message() => {
let msg = match msg {
Expand All @@ -206,12 +234,21 @@ mod run {
break;
}
};
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
match msg {
Message::Ping => {
iroha_logger::trace!("Received peer ping");
},
Message::Message(msg) => {
iroha_logger::trace!("Received peer message");
let peer_message = PeerMessage(peer_id.clone(), msg);
if peer_message_sender.send(peer_message).await.is_err() {
iroha_logger::error!("Network dropped peer message channel.");
break;
}
}
};
// Reset idle timeout as we received message from peer
idle_interval.reset();
}
else => break,
}
Expand All @@ -229,6 +266,7 @@ mod run {
pub(super) struct RunPeerArgs<T: Pload, P> {
pub peer: P,
pub service_message_sender: mpsc::Sender<ServiceMessage<T>>,
pub idle_timeout: Duration,
}

/// Trait for peer stages that might be used as starting point for peer's [`run`] function.
Expand Down Expand Up @@ -365,6 +403,13 @@ mod run {
Ok(())
}
}

/// Either message or ping
#[derive(Encode, Decode, Clone, Debug)]
enum Message<T> {
Message(T),
Ping,
}
}

mod state {
Expand Down
13 changes: 9 additions & 4 deletions p2p/tests/integration/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ async fn network_create() {
let address = socket_addr!(127.0.0.1:12_000);
let key_pair = KeyPair::random();
let public_key = key_pair.public_key().clone();
let network = NetworkHandle::start(address.clone(), key_pair)
let idle_timeout = Duration::from_secs(60);
let network = NetworkHandle::start(address.clone(), key_pair, idle_timeout)
.await
.unwrap();
tokio::time::sleep(delay).await;
Expand Down Expand Up @@ -139,20 +140,21 @@ impl TestActor {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_networks() {
let delay = Duration::from_millis(300);
let idle_timeout = Duration::from_secs(60);
setup_logger();
let key_pair1 = KeyPair::random();
let public_key1 = key_pair1.public_key().clone();
let key_pair2 = KeyPair::random().clone();
let public_key2 = key_pair2.public_key().clone();
info!("Starting first network...");
let address1 = socket_addr!(127.0.0.1:12_005);
let mut network1 = NetworkHandle::start(address1.clone(), key_pair1)
let mut network1 = NetworkHandle::start(address1.clone(), key_pair1, idle_timeout)
.await
.unwrap();

info!("Starting second network...");
let address2 = socket_addr!(127.0.0.1:12_010);
let network2 = NetworkHandle::start(address2.clone(), key_pair2)
let network2 = NetworkHandle::start(address2.clone(), key_pair2, idle_timeout)
.await
.unwrap();

Expand Down Expand Up @@ -287,7 +289,10 @@ async fn start_network(
let actor = TestActor::start(messages);

let PeerId { address, .. } = peer.clone();
let mut network = NetworkHandle::start(address, key_pair).await.unwrap();
let idle_timeout = Duration::from_secs(60);
let mut network = NetworkHandle::start(address, key_pair, idle_timeout)
.await
.unwrap();
network.subscribe_to_peers_messages(actor);

let _ = barrier.wait().await;
Expand Down

0 comments on commit 0bf8164

Please sign in to comment.