Skip to content

Commit

Permalink
[fix] #4267: Introduce p2p idle timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Apr 3, 2024
1 parent 172769c commit ff9458f
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,10 @@ impl Iroha {
genesis: Option<GenesisNetwork>,
logger: LoggerHandle,
) -> Result<Self> {
let network = IrohaNetwork::start(
config.common.p2p_address.clone(),
config.common.key_pair.clone(),
)
.await
.wrap_err("Unable to start P2P-network")?;
let network =
IrohaNetwork::start(config.common.key_pair.clone(), config.common.p2p.clone())
.await
.wrap_err("Unable to start P2P-network")?;

let (events_sender, _) = broadcast::channel(10000);
let world = World::with(
Expand Down
12 changes: 10 additions & 2 deletions config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,24 @@ impl Root {
pub struct Common {
pub chain_id: ChainId,
pub key_pair: KeyPair,
pub p2p_address: SocketAddr,
pub p2p: P2p,
}

impl Common {
/// Construct an id of this peer
pub fn peer_id(&self) -> PeerId {
PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone())
PeerId::new(self.p2p.address.clone(), self.key_pair.public_key().clone())
}
}

/// Common options shared between multiple places
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct P2p {
pub address: SocketAddr,
pub idle_timeout: Duration,
}

/// Parsed genesis configuration
#[derive(Debug, Clone)]
pub enum Genesis {
Expand Down
2 changes: 2 additions & 0 deletions config/src/parameters/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod network {

pub const DEFAULT_MAX_TRANSACTIONS_PER_GOSSIP: NonZeroU32 = nonzero!(500u32);
pub const DEFAULT_MAX_BLOCKS_PER_GOSSIP: NonZeroU32 = nonzero!(4u32);

pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
}

pub mod snapshot {
Expand Down
16 changes: 11 additions & 5 deletions config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Root {
}
}

let (p2p_address, block_sync, transaction_gossiper) = self.network.parse();
let (network, block_sync, transaction_gossiper) = self.network.parse();

let logger = self.logger;
let queue = self.queue;
Expand Down Expand Up @@ -216,7 +216,7 @@ impl Root {

let chain_wide = self.chain_wide.parse();

if p2p_address == torii.address {
if network.address == torii.address {
emitter.emit(eyre!(
"`iroha.p2p_address` and `torii.address` should not be the same"
))
Expand All @@ -227,7 +227,7 @@ impl Root {
let peer = actual::Common {
chain_id: self.chain_id,
key_pair: key_pair.unwrap(),
p2p_address,
p2p: network,
};
let telemetry = telemetry.unwrap();
let genesis = genesis.unwrap();
Expand Down Expand Up @@ -457,20 +457,26 @@ pub struct Network {
pub block_gossip_period: Duration,
pub transaction_gossip_max_size: NonZeroU32,
pub transaction_gossip_period: Duration,
/// Duration of time after which connection with peer is terminated if peer is idle
pub idle_timeout: Duration,
}

impl Network {
fn parse(self) -> (SocketAddr, actual::BlockSync, actual::TransactionGossiper) {
fn parse(self) -> (actual::P2p, actual::BlockSync, actual::TransactionGossiper) {
let Self {
address,
block_gossip_max_size,
block_gossip_period,
transaction_gossip_max_size,
transaction_gossip_period,
idle_timeout,
} = self;

(
address,
actual::P2p {
address,
idle_timeout,
},
actual::BlockSync {
gossip_period: block_gossip_period,
gossip_max_size: block_gossip_max_size,
Expand Down
5 changes: 5 additions & 0 deletions config/src/parameters/user/boilerplate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct NetworkPartial {
pub block_gossip_period: UserField<HumanDuration>,
pub transaction_gossip_max_size: UserField<NonZeroU32>,
pub transaction_gossip_period: UserField<HumanDuration>,
pub idle_timeout: UserField<HumanDuration>,
}

impl UnwrapPartial for NetworkPartial {
Expand Down Expand Up @@ -445,6 +446,10 @@ impl UnwrapPartial for NetworkPartial {
.block_gossip_max_size
.get()
.unwrap_or(DEFAULT_MAX_BLOCKS_PER_GOSSIP),
idle_timeout: self
.idle_timeout
.map(HumanDuration::get)
.unwrap_or(DEFAULT_IDLE_TIMEOUT),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions configs/peer.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# block_gossip_max_size = 4
# transaction_gossip_period = "1s"
# transaction_gossip_max_size = 500
# idle_timeout = "60s"

[torii]
# address =
Expand Down
Binary file modified configs/swarm/executor.wasm
Binary file not shown.
7 changes: 5 additions & 2 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroha_client::{
config::Config as ClientConfig,
data_model::{isi::Instruction, peer::Peer as DataModelPeer, prelude::*, query::Query, Level},
};
use iroha_config::parameters::actual::Root as Config;
use iroha_config::parameters::actual::{P2p, Root as Config};
pub use iroha_core::state::StateReadOnly;
use iroha_crypto::KeyPair;
use iroha_data_model::{query::QueryOutputBox, ChainId};
Expand Down Expand Up @@ -412,7 +412,10 @@ impl Peer {
Config {
common: Common {
key_pair: self.key_pair.clone(),
p2p_address: self.p2p_address.clone(),
p2p: P2p {
address: self.p2p_address.clone(),
..config.common.p2p
},
..config.common
},
torii: Torii {
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ iroha_logger = { workspace = true }
iroha_crypto = { workspace = true, default-features = true }
iroha_data_model = { workspace = true, default-features = true, features = ["transparent_api"] }
iroha_primitives = { workspace = true }
iroha_config = { workspace = true }
iroha_config_base = { workspace = true }
iroha_data_model_derive = { workspace = true }

Expand Down
17 changes: 15 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
};

use futures::{stream::FuturesUnordered, StreamExt};
use iroha_config::parameters::actual::P2p;
use iroha_crypto::{KeyPair, PublicKey};
use iroha_data_model::prelude::PeerId;
use iroha_logger::prelude::*;
Expand Down Expand Up @@ -67,7 +68,13 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
/// # Errors
/// - If binding to address fail
#[log(skip(key_pair))]
pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result<Self, Error> {
pub async fn start(
key_pair: KeyPair,
P2p {
address: listen_addr,
idle_timeout,
}: P2p,
) -> Result<Self, Error> {
let listener = TcpListener::bind(&listen_addr.to_string()).await?;
iroha_logger::info!("Network bound to listener");
let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new());
Expand Down Expand Up @@ -95,6 +102,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -192,6 +200,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -278,6 +288,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -341,6 +352,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -366,6 +378,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -395,7 +409,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key().clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
Loading

0 comments on commit ff9458f

Please sign in to comment.