Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] #4267: Introduce p2p idle timeout #4398

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,9 @@ impl Iroha {
genesis: Option<GenesisNetwork>,
logger: LoggerHandle,
) -> Result<Self> {
let network = IrohaNetwork::start(
config.common.p2p_address.clone(),
config.common.key_pair.clone(),
)
.await
.wrap_err("Unable to start P2P-network")?;
let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone())
.await
.wrap_err("Unable to start P2P-network")?;

let (events_sender, _) = broadcast::channel(10000);
let world = World::with(
Expand Down
13 changes: 11 additions & 2 deletions config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
#[allow(missing_docs)]
pub struct Root {
pub common: Common,
pub network: Network,
pub genesis: Genesis,
pub torii: Torii,
pub kura: Kura,
Expand Down Expand Up @@ -71,16 +72,24 @@ impl Root {
pub struct Common {
pub chain_id: ChainId,
pub key_pair: KeyPair,
pub p2p_address: SocketAddr,
pub peer_id: PeerId,
}

impl Common {
/// Construct an id of this peer
pub fn peer_id(&self) -> PeerId {
PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone())
self.peer_id.clone()
}
}

/// Network options
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct Network {
pub address: SocketAddr,
pub idle_timeout: Duration,
}

/// Parsed genesis configuration
#[derive(Debug, Clone)]
pub enum Genesis {
Expand Down
2 changes: 2 additions & 0 deletions config/src/parameters/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod network {

pub const DEFAULT_MAX_TRANSACTIONS_PER_GOSSIP: NonZeroU32 = nonzero!(500u32);
pub const DEFAULT_MAX_BLOCKS_PER_GOSSIP: NonZeroU32 = nonzero!(4u32);

pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
}

pub mod snapshot {
Expand Down
28 changes: 22 additions & 6 deletions config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Root {
}
}

let (p2p_address, block_sync, transaction_gossiper) = self.network.parse();
let (network, block_sync, transaction_gossiper) = self.network.parse();

let logger = self.logger;
let queue = self.queue;
Expand Down Expand Up @@ -216,18 +216,21 @@ impl Root {

let chain_wide = self.chain_wide.parse();

if p2p_address == torii.address {
if network.address == torii.address {
emitter.emit(eyre!(
"`iroha.p2p_address` and `torii.address` should not be the same"
))
}

emitter.finish()?;

let key_pair = key_pair.unwrap();
let peer_id = PeerId::new(network.address.clone(), key_pair.public_key().clone());

let peer = actual::Common {
chain_id: self.chain_id,
key_pair: key_pair.unwrap(),
p2p_address,
key_pair,
peer_id,
};
let telemetry = telemetry.unwrap();
let genesis = genesis.unwrap();
Expand All @@ -239,6 +242,7 @@ impl Root {

Ok(actual::Root {
common: peer,
network,
genesis,
torii,
kura,
Expand Down Expand Up @@ -457,20 +461,32 @@ pub struct Network {
pub block_gossip_period: Duration,
pub transaction_gossip_max_size: NonZeroU32,
pub transaction_gossip_period: Duration,
/// Duration of time after which connection with peer is terminated if peer is idle
pub idle_timeout: Duration,
}

impl Network {
fn parse(self) -> (SocketAddr, actual::BlockSync, actual::TransactionGossiper) {
fn parse(
self,
) -> (
actual::Network,
actual::BlockSync,
actual::TransactionGossiper,
) {
let Self {
address,
block_gossip_max_size,
block_gossip_period,
transaction_gossip_max_size,
transaction_gossip_period,
idle_timeout,
} = self;

(
address,
actual::Network {
address,
idle_timeout,
},
actual::BlockSync {
gossip_period: block_gossip_period,
gossip_max_size: block_gossip_max_size,
Expand Down
5 changes: 5 additions & 0 deletions config/src/parameters/user/boilerplate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct NetworkPartial {
pub block_gossip_period: UserField<HumanDuration>,
pub transaction_gossip_max_size: UserField<NonZeroU32>,
pub transaction_gossip_period: UserField<HumanDuration>,
pub idle_timeout: UserField<HumanDuration>,
}

impl UnwrapPartial for NetworkPartial {
Expand Down Expand Up @@ -445,6 +446,10 @@ impl UnwrapPartial for NetworkPartial {
.block_gossip_max_size
.get()
.unwrap_or(DEFAULT_MAX_BLOCKS_PER_GOSSIP),
idle_timeout: self
.idle_timeout
.map(HumanDuration::get)
.unwrap_or(DEFAULT_IDLE_TIMEOUT),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions configs/peer.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# block_gossip_max_size = 4
# transaction_gossip_period = "1s"
# transaction_gossip_max_size = 500
# idle_timeout = "60s"

[torii]
# address =
Expand Down
Binary file modified configs/swarm/executor.wasm
Binary file not shown.
8 changes: 6 additions & 2 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,18 @@ impl Drop for Peer {
impl Peer {
/// Returns per peer config with all addresses, keys, and id set up.
fn get_config(&self, config: Config) -> Config {
use iroha_config::parameters::actual::{Common, Torii};
use iroha_config::parameters::actual::{Common, Network, Torii};

Config {
common: Common {
key_pair: self.key_pair.clone(),
p2p_address: self.p2p_address.clone(),
peer_id: PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone()),
..config.common
},
network: Network {
address: self.p2p_address.clone(),
..config.network
},
torii: Torii {
address: self.api_address.clone(),
..config.torii
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ iroha_logger = { workspace = true }
iroha_crypto = { workspace = true, default-features = true }
iroha_data_model = { workspace = true, default-features = true, features = ["transparent_api"] }
iroha_primitives = { workspace = true }
iroha_config = { workspace = true }
iroha_config_base = { workspace = true }
iroha_data_model_derive = { workspace = true }

Expand Down
17 changes: 15 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
};

use futures::{stream::FuturesUnordered, StreamExt};
use iroha_config::parameters::actual::Network as Config;
use iroha_crypto::{KeyPair, PublicKey};
use iroha_data_model::prelude::PeerId;
use iroha_logger::prelude::*;
Expand Down Expand Up @@ -67,7 +68,13 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
/// # Errors
/// - If binding to address fail
#[log(skip(key_pair))]
pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result<Self, Error> {
pub async fn start(
key_pair: KeyPair,
Config {
address: listen_addr,
idle_timeout,
}: Config,
) -> Result<Self, Error> {
let listener = TcpListener::bind(&listen_addr.to_string()).await?;
iroha_logger::info!("Network bound to listener");
let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new());
Expand Down Expand Up @@ -95,6 +102,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -192,6 +200,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -278,6 +288,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -341,6 +352,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -366,6 +378,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -395,7 +409,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key().clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
Loading
Loading