diff --git a/Cargo.lock b/Cargo.lock index 06fa8f6a1f3..b09ce7122a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3122,6 +3122,7 @@ dependencies = [ "derive_more", "displaydoc", "futures", + "iroha_config", "iroha_config_base", "iroha_crypto", "iroha_data_model", diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 9854535d50a..0787ea2207c 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -203,12 +203,9 @@ impl Iroha { genesis: Option, logger: LoggerHandle, ) -> Result { - let network = IrohaNetwork::start( - config.common.p2p_address.clone(), - config.common.key_pair.clone(), - ) - .await - .wrap_err("Unable to start P2P-network")?; + let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone()) + .await + .wrap_err("Unable to start P2P-network")?; let (events_sender, _) = broadcast::channel(10000); let world = World::with( diff --git a/config/src/parameters/actual.rs b/config/src/parameters/actual.rs index 6b10b3d870b..c9dc0973ab6 100644 --- a/config/src/parameters/actual.rs +++ b/config/src/parameters/actual.rs @@ -31,6 +31,7 @@ use crate::{ #[allow(missing_docs)] pub struct Root { pub common: Common, + pub network: Network, pub genesis: Genesis, pub torii: Torii, pub kura: Kura, @@ -71,16 +72,24 @@ impl Root { pub struct Common { pub chain_id: ChainId, pub key_pair: KeyPair, - pub p2p_address: SocketAddr, + pub peer_id: PeerId, } impl Common { /// Construct an id of this peer pub fn peer_id(&self) -> PeerId { - PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone()) + self.peer_id.clone() } } +/// Network options +#[allow(missing_docs)] +#[derive(Debug, Clone)] +pub struct Network { + pub address: SocketAddr, + pub idle_timeout: Duration, +} + /// Parsed genesis configuration #[derive(Debug, Clone)] pub enum Genesis { diff --git a/config/src/parameters/defaults.rs b/config/src/parameters/defaults.rs index d81c08b4ac9..4b42ac46c90 100644 --- a/config/src/parameters/defaults.rs +++ b/config/src/parameters/defaults.rs @@ -33,6 +33,8 @@ pub mod network { pub const DEFAULT_MAX_TRANSACTIONS_PER_GOSSIP: NonZeroU32 = nonzero!(500u32); pub const DEFAULT_MAX_BLOCKS_PER_GOSSIP: NonZeroU32 = nonzero!(4u32); + + pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60); } pub mod snapshot { diff --git a/config/src/parameters/user.rs b/config/src/parameters/user.rs index 5afae534472..35774e882a6 100644 --- a/config/src/parameters/user.rs +++ b/config/src/parameters/user.rs @@ -186,7 +186,7 @@ impl Root { } } - let (p2p_address, block_sync, transaction_gossiper) = self.network.parse(); + let (network, block_sync, transaction_gossiper) = self.network.parse(); let logger = self.logger; let queue = self.queue; @@ -216,7 +216,7 @@ impl Root { let chain_wide = self.chain_wide.parse(); - if p2p_address == torii.address { + if network.address == torii.address { emitter.emit(eyre!( "`iroha.p2p_address` and `torii.address` should not be the same" )) @@ -224,10 +224,13 @@ impl Root { emitter.finish()?; + let key_pair = key_pair.unwrap(); + let peer_id = PeerId::new(network.address.clone(), key_pair.public_key().clone()); + let peer = actual::Common { chain_id: self.chain_id, - key_pair: key_pair.unwrap(), - p2p_address, + key_pair, + peer_id, }; let telemetry = telemetry.unwrap(); let genesis = genesis.unwrap(); @@ -239,6 +242,7 @@ impl Root { Ok(actual::Root { common: peer, + network, genesis, torii, kura, @@ -457,20 +461,32 @@ pub struct Network { pub block_gossip_period: Duration, pub transaction_gossip_max_size: NonZeroU32, pub transaction_gossip_period: Duration, + /// Duration of time after which connection with peer is terminated if peer is idle + pub idle_timeout: Duration, } impl Network { - fn parse(self) -> (SocketAddr, actual::BlockSync, actual::TransactionGossiper) { + fn parse( + self, + ) -> ( + actual::Network, + actual::BlockSync, + actual::TransactionGossiper, + ) { let Self { address, block_gossip_max_size, block_gossip_period, transaction_gossip_max_size, transaction_gossip_period, + idle_timeout, } = self; ( - address, + actual::Network { + address, + idle_timeout, + }, actual::BlockSync { gossip_period: block_gossip_period, gossip_max_size: block_gossip_max_size, diff --git a/config/src/parameters/user/boilerplate.rs b/config/src/parameters/user/boilerplate.rs index b2863772d71..07ef9aa4a16 100644 --- a/config/src/parameters/user/boilerplate.rs +++ b/config/src/parameters/user/boilerplate.rs @@ -417,6 +417,7 @@ pub struct NetworkPartial { pub block_gossip_period: UserField, pub transaction_gossip_max_size: UserField, pub transaction_gossip_period: UserField, + pub idle_timeout: UserField, } impl UnwrapPartial for NetworkPartial { @@ -445,6 +446,10 @@ impl UnwrapPartial for NetworkPartial { .block_gossip_max_size .get() .unwrap_or(DEFAULT_MAX_BLOCKS_PER_GOSSIP), + idle_timeout: self + .idle_timeout + .map(HumanDuration::get) + .unwrap_or(DEFAULT_IDLE_TIMEOUT), }) } } diff --git a/configs/peer.template.toml b/configs/peer.template.toml index 2c8b88a7616..cce2aab5990 100644 --- a/configs/peer.template.toml +++ b/configs/peer.template.toml @@ -24,6 +24,7 @@ # block_gossip_max_size = 4 # transaction_gossip_period = "1s" # transaction_gossip_max_size = 500 +# idle_timeout = "60s" [torii] # address = diff --git a/configs/swarm/executor.wasm b/configs/swarm/executor.wasm index 1abb8d63df4..46579c359df 100644 Binary files a/configs/swarm/executor.wasm and b/configs/swarm/executor.wasm differ diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index df2843afa06..2c966af2b23 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -407,14 +407,18 @@ impl Drop for Peer { impl Peer { /// Returns per peer config with all addresses, keys, and id set up. fn get_config(&self, config: Config) -> Config { - use iroha_config::parameters::actual::{Common, Torii}; + use iroha_config::parameters::actual::{Common, Network, Torii}; Config { common: Common { key_pair: self.key_pair.clone(), - p2p_address: self.p2p_address.clone(), + peer_id: PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone()), ..config.common }, + network: Network { + address: self.p2p_address.clone(), + ..config.network + }, torii: Torii { address: self.api_address.clone(), ..config.torii diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 65d52302204..e5449023aa6 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -16,6 +16,7 @@ iroha_logger = { workspace = true } iroha_crypto = { workspace = true, default-features = true } iroha_data_model = { workspace = true, default-features = true, features = ["transparent_api"] } iroha_primitives = { workspace = true } +iroha_config = { workspace = true } iroha_config_base = { workspace = true } iroha_data_model_derive = { workspace = true } diff --git a/p2p/src/network.rs b/p2p/src/network.rs index c867746b435..ab781dcd626 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -6,6 +6,7 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; +use iroha_config::parameters::actual::Network as Config; use iroha_crypto::{KeyPair, PublicKey}; use iroha_data_model::prelude::PeerId; use iroha_logger::prelude::*; @@ -67,7 +68,13 @@ impl NetworkBaseHandle { /// # Errors /// - If binding to address fail #[log(skip(key_pair))] - pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result { + pub async fn start( + key_pair: KeyPair, + Config { + address: listen_addr, + idle_timeout, + }: Config, + ) -> Result { let listener = TcpListener::bind(&listen_addr.to_string()).await?; iroha_logger::info!("Network bound to listener"); let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new()); @@ -95,6 +102,7 @@ impl NetworkBaseHandle { service_message_sender, current_conn_id: 0, current_topology: HashMap::new(), + idle_timeout, _key_exchange: core::marker::PhantomData::, _encryptor: core::marker::PhantomData::, }; @@ -192,6 +200,8 @@ struct NetworkBase { /// Current topology /// Bool determines who is responsible for initiating connection current_topology: HashMap, + /// Duration after which terminate connection with idle peer + idle_timeout: Duration, /// Key exchange used by network _key_exchange: core::marker::PhantomData, /// Encryptor used by the network @@ -278,6 +288,7 @@ impl NetworkBase { self.key_pair.clone(), Connection::new(conn_id, stream), service_message_sender, + self.idle_timeout, ); } @@ -341,6 +352,7 @@ impl NetworkBase { self.key_pair.clone(), conn_id, service_message_sender, + self.idle_timeout, ); } @@ -366,6 +378,8 @@ impl NetworkBase { disambiguator, }: Connected, ) { + self.connecting_peers.remove(&connection_id); + if !self.current_topology.contains_key(&peer_id) { iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect"); return; @@ -395,7 +409,6 @@ impl NetworkBase { }; let _ = peer_message_sender.send(self.peer_message_sender.clone()); self.peers.insert(peer_id.public_key().clone(), ref_peer); - self.connecting_peers.remove(&connection_id); Self::add_online_peer(&self.online_peers_sender, peer_id); } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index fd8551cb969..bfa8af93b80 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,6 +12,7 @@ use tokio::{ TcpStream, }, sync::{mpsc, oneshot}, + time::Duration, }; use crate::{boilerplate::*, Error}; @@ -38,6 +39,7 @@ pub mod handles { key_pair: KeyPair, connection_id: ConnectionId, service_message_sender: mpsc::Sender>, + idle_timeout: Duration, ) { let peer = state::Connecting { peer_addr, @@ -47,6 +49,7 @@ pub mod handles { let peer = RunPeerArgs { peer, service_message_sender, + idle_timeout, }; tokio::task::spawn(run::run::(peer).in_current_span()); } @@ -57,6 +60,7 @@ pub mod handles { key_pair: KeyPair, connection: Connection, service_message_sender: mpsc::Sender>, + idle_timeout: Duration, ) { let peer = state::ConnectedFrom { peer_addr, @@ -66,6 +70,7 @@ pub mod handles { let peer = RunPeerArgs { peer, service_message_sender, + idle_timeout, }; tokio::task::spawn(run::run::(peer).in_current_span()); } @@ -92,6 +97,8 @@ mod run { //! Module with peer [`run`] function. use iroha_logger::prelude::*; + use parity_scale_codec::Decode; + use tokio::time::Instant; use super::{ cryptographer::Cryptographer, @@ -108,6 +115,7 @@ mod run { RunPeerArgs { peer, service_message_sender, + idle_timeout, }: RunPeerArgs, ) { let conn_id = peer.connection_id(); @@ -118,11 +126,15 @@ mod run { // Insure proper termination from every execution path. async { // Try to do handshake process - let peer = match peer.handshake().await { - Ok(ready) => ready, - Err(error) => { + let peer = match tokio::time::timeout(idle_timeout, peer.handshake()).await { + Ok(Ok(ready)) => ready, + Ok(Err(error)) => { iroha_logger::error!(%error, "Failure during handshake."); return; + }, + Err(_) => { + iroha_logger::error!(timeout=?idle_timeout, "Other peer has been idle during handshake"); + return; } }; @@ -175,8 +187,28 @@ mod run { let mut message_reader = MessageReader::new(read, cryptographer.clone()); let mut message_sender = MessageSender::new(write, cryptographer); + let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_timeout, idle_timeout); + let mut ping_interval = tokio::time::interval_at(Instant::now() + idle_timeout / 2, idle_timeout / 2); + loop { tokio::select! { + _ = ping_interval.tick() => { + iroha_logger::trace!( + ping_period=?ping_interval.period(), + "The connection has been idle, pinging to check if it's alive" + ); + if let Err(error) = message_sender.send_message(Message::::Ping).await { + iroha_logger::error!(%error, "Failed to send ping to peer."); + break; + } + } + _ = idle_interval.tick() => { + iroha_logger::error!( + timeout=?idle_interval.period(), + "Didn't receive anything from the peer within given timeout, abandoning this connection" + ); + break; + } msg = post_receiver.recv() => { let Some(msg) = msg else { iroha_logger::debug!("Peer handle dropped."); @@ -187,7 +219,7 @@ mod run { if post_receiver_len > 100 { iroha_logger::warn!(size=post_receiver_len, "Peer post messages are pilling up"); } - if let Err(error) = message_sender.send_message(msg).await { + if let Err(error) = message_sender.send_message(Message::Data(msg)).await { iroha_logger::error!(%error, "Failed to send message to peer."); break; } @@ -206,12 +238,29 @@ mod run { break; } }; - iroha_logger::trace!("Received peer message"); - let peer_message = PeerMessage(peer_id.clone(), msg); - if peer_message_sender.send(peer_message).await.is_err() { - iroha_logger::error!("Network dropped peer message channel."); - break; - } + match msg { + Message::Ping => { + iroha_logger::trace!("Received peer ping"); + if let Err(error) = message_sender.send_message(Message::::Pong).await { + iroha_logger::error!(%error, "Failed to send message to peer."); + break; + } + }, + Message::Pong => { + iroha_logger::trace!("Received peer pong"); + } + Message::Data(msg) => { + iroha_logger::trace!("Received peer message"); + let peer_message = PeerMessage(peer_id.clone(), msg); + if peer_message_sender.send(peer_message).await.is_err() { + iroha_logger::error!("Network dropped peer message channel."); + break; + } + } + }; + // Reset idle and ping timeout as peer received message from another peer + idle_interval.reset(); + ping_interval.reset(); } else => break, } @@ -229,6 +278,7 @@ mod run { pub(super) struct RunPeerArgs { pub peer: P, pub service_message_sender: mpsc::Sender>, + pub idle_timeout: Duration, } /// Trait for peer stages that might be used as starting point for peer's [`run`] function. @@ -365,6 +415,14 @@ mod run { Ok(()) } } + + /// Either message or ping + #[derive(Encode, Decode, Clone, Debug)] + enum Message { + Data(T), + Ping, + Pong, + } } mod state { diff --git a/p2p/tests/integration/p2p.rs b/p2p/tests/integration/p2p.rs index 9f1ec520c87..74279167f5e 100644 --- a/p2p/tests/integration/p2p.rs +++ b/p2p/tests/integration/p2p.rs @@ -8,6 +8,7 @@ use std::{ }; use futures::{prelude::*, stream::FuturesUnordered, task::AtomicWaker}; +use iroha_config::parameters::actual::Network as Config; use iroha_crypto::KeyPair; use iroha_data_model::prelude::PeerId; use iroha_logger::{prelude::*, test_logger}; @@ -38,9 +39,12 @@ async fn network_create() { let address = socket_addr!(127.0.0.1:12_000); let key_pair = KeyPair::random(); let public_key = key_pair.public_key().clone(); - let network = NetworkHandle::start(address.clone(), key_pair) - .await - .unwrap(); + let idle_timeout = Duration::from_secs(60); + let config = Config { + address: address.clone(), + idle_timeout, + }; + let network = NetworkHandle::start(key_pair, config).await.unwrap(); tokio::time::sleep(delay).await; info!("Connecting to peer..."); @@ -139,6 +143,7 @@ impl TestActor { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn two_networks() { let delay = Duration::from_millis(300); + let idle_timeout = Duration::from_secs(60); setup_logger(); let key_pair1 = KeyPair::random(); let public_key1 = key_pair1.public_key().clone(); @@ -146,15 +151,19 @@ async fn two_networks() { let public_key2 = key_pair2.public_key().clone(); info!("Starting first network..."); let address1 = socket_addr!(127.0.0.1:12_005); - let mut network1 = NetworkHandle::start(address1.clone(), key_pair1) - .await - .unwrap(); + let config1 = Config { + address: address1.clone(), + idle_timeout, + }; + let mut network1 = NetworkHandle::start(key_pair1, config1).await.unwrap(); info!("Starting second network..."); let address2 = socket_addr!(127.0.0.1:12_010); - let network2 = NetworkHandle::start(address2.clone(), key_pair2) - .await - .unwrap(); + let config2 = Config { + address: address2.clone(), + idle_timeout, + }; + let network2 = NetworkHandle::start(key_pair2, config2).await.unwrap(); let mut messages2 = WaitForN::new(1); let actor2 = TestActor::start(messages2.clone()); @@ -287,7 +296,12 @@ async fn start_network( let actor = TestActor::start(messages); let PeerId { address, .. } = peer.clone(); - let mut network = NetworkHandle::start(address, key_pair).await.unwrap(); + let idle_timeout = Duration::from_secs(60); + let config = Config { + address, + idle_timeout, + }; + let mut network = NetworkHandle::start(key_pair, config).await.unwrap(); network.subscribe_to_peers_messages(actor); let _ = barrier.wait().await;