From d7fd868dca93255dbd12ca84505a2a4c4fee34c8 Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 15 Jan 2025 15:04:41 +0800 Subject: [PATCH] feat: add transport type to outbound service --- benches/benches/benchmarks/overall.rs | 3 +- chain/src/tests/util.rs | 3 +- network/src/network.rs | 37 +++++-- network/src/peer_store/addr_manager.rs | 104 +++++++++--------- network/src/peer_store/peer_store_impl.rs | 1 - network/src/peer_store/types.rs | 1 + network/src/services/outbound_peer.rs | 29 ++++- network/src/tests/mod.rs | 2 +- network/src/tests/peer_store.rs | 18 +++ rpc/src/tests/setup.rs | 3 +- sync/src/relayer/tests/helper.rs | 7 +- util/launcher/src/lib.rs | 5 +- .../src/tests/utils/chain.rs | 3 +- 13 files changed, 137 insertions(+), 79 deletions(-) diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs index 103cab0893a..17b8c91ce86 100644 --- a/benches/benches/benchmarks/overall.rs +++ b/benches/benches/benchmarks/overall.rs @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController}; use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::JsonBytes; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_store::ChainStore; use ckb_types::{ @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/chain/src/tests/util.rs b/chain/src/tests/util.rs index f29cd97ad72..2d1cde059fb 100644 --- a/chain/src/tests/util.rs +++ b/chain/src/tests/util.rs @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig}; use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_store::ChainStore; use ckb_test_chain_utils::{always_success_cell, create_always_success_tx}; @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/network/src/network.rs b/network/src/network.rs index d51fe87916f..630d82a1986 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -831,6 +831,7 @@ impl NetworkService { required_protocol_ids: Vec, // name, version, flags identify_announce: (String, String, Flags), + transport_type: TransportType, ) -> Self { let config = &network_state.config; @@ -1017,7 +1018,7 @@ impl NetworkService { service_builder = service_builder.tcp_config(bind_fn); } } - TransportType::Ws => { + TransportType::Ws | TransportType::Wss => { // only bind once if matches!(init, BindType::Ws) { continue; @@ -1074,6 +1075,7 @@ impl NetworkService { Arc::clone(&network_state), p2p_service.control().to_owned().into(), Duration::from_secs(config.connect_outbound_interval_secs), + transport_type, ); bg_services.push(Box::pin(outbound_peer_service) as Pin>); }; @@ -1521,18 +1523,33 @@ pub(crate) async fn async_disconnect_with_message( } #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] -pub(crate) enum TransportType { +pub enum TransportType { Tcp, Ws, + Wss, } -pub(crate) fn find_type(addr: &Multiaddr) -> TransportType { - if addr - .iter() - .any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss)) - { - TransportType::Ws - } else { - TransportType::Tcp +impl<'a> From for p2p::multiaddr::Protocol<'a> { + fn from(value: TransportType) -> Self { + match value { + TransportType::Ws => Protocol::Ws, + TransportType::Wss => Protocol::Wss, + _ => unreachable!(), + } } } + +pub(crate) fn find_type(addr: &Multiaddr) -> TransportType { + let mut iter = addr.iter(); + + iter.find_map(|proto| { + if let Protocol::Ws = proto { + Some(TransportType::Ws) + } else if let Protocol::Wss = proto { + Some(TransportType::Wss) + } else { + None + } + }) + .unwrap_or(TransportType::Tcp) +} diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index b4b5179ba3b..a94ee35a7b4 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -3,13 +3,12 @@ use crate::peer_store::types::AddrInfo; use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr}; use rand::Rng; use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; /// Address manager #[derive(Default)] pub struct AddrManager { next_id: u64, - addr_to_id: HashMap, + addr_to_id: HashMap, id_to_info: HashMap, random_ids: Vec, } @@ -17,27 +16,25 @@ pub struct AddrManager { impl AddrManager { /// Add an address information to address manager pub fn add(&mut self, mut addr_info: AddrInfo) { - if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) { - if let Some(&id) = self.addr_to_id.get(&key) { - let (exist_last_connected_at_ms, random_id_pos) = { - let info = self.id_to_info.get(&id).expect("must exists"); - (info.last_connected_at_ms, info.random_id_pos) - }; - // Get time earlier than record time, return directly - if addr_info.last_connected_at_ms >= exist_last_connected_at_ms { - addr_info.random_id_pos = random_id_pos; - self.id_to_info.insert(id, addr_info); - } - return; + if let Some(&id) = self.addr_to_id.get(&addr_info.addr) { + let (exist_last_connected_at_ms, random_id_pos) = { + let info = self.id_to_info.get(&id).expect("must exists"); + (info.last_connected_at_ms, info.random_id_pos) + }; + // Get time earlier than record time, return directly + if addr_info.last_connected_at_ms >= exist_last_connected_at_ms { + addr_info.random_id_pos = random_id_pos; + self.id_to_info.insert(id, addr_info); } - - let id = self.next_id; - self.addr_to_id.insert(key, id); - addr_info.random_id_pos = self.random_ids.len(); - self.id_to_info.insert(id, addr_info); - self.random_ids.push(id); - self.next_id += 1; + return; } + + let id = self.next_id; + self.addr_to_id.insert(addr_info.addr.clone(), id); + addr_info.random_id_pos = self.random_ids.len(); + self.id_to_info.insert(id, addr_info); + self.random_ids.push(id); + self.next_id += 1; } /// Randomly return addrs that worth to try or connect. @@ -55,23 +52,30 @@ impl AddrManager { let j = rng.gen_range(i..self.random_ids.len()); self.swap_random_id(j, i); let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned(); - if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) { - let ip = socket_addr.ip(); - let is_unique_ip = !duplicate_ips.contains(&ip); - // A trick to make our tests work - // TODO remove this after fix the network tests. - let is_test_ip = ip.is_unspecified() || ip.is_loopback(); - if (is_test_ip || is_unique_ip) - && addr_info.is_connectable(now_ms) - && filter(&addr_info) - { - duplicate_ips.insert(ip); - addr_infos.push(addr_info); + match multiaddr_to_socketaddr(&addr_info.addr) { + Some(socket_addr) => { + let ip = socket_addr.ip(); + let is_unique_ip = !duplicate_ips.contains(&ip); + // A trick to make our tests work + // TODO remove this after fix the network tests. + let is_test_ip = ip.is_unspecified() || ip.is_loopback(); + if (is_test_ip || is_unique_ip) + && addr_info.is_connectable(now_ms) + && filter(&addr_info) + { + duplicate_ips.insert(ip); + addr_infos.push(addr_info); + } } - if addr_infos.len() == count { - break; + None => { + if addr_info.is_connectable(now_ms) && filter(&addr_info) { + addr_infos.push(addr_info); + } } } + if addr_infos.len() == count { + break; + } } addr_infos } @@ -88,34 +92,26 @@ impl AddrManager { /// Remove an address by ip and port pub fn remove(&mut self, addr: &Multiaddr) -> Option { - multiaddr_to_socketaddr(addr).and_then(|addr| { - self.addr_to_id.remove(&addr).and_then(|id| { - let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos; - // swap with last index, then remove the last index - self.swap_random_id(random_id_pos, self.random_ids.len() - 1); - self.random_ids.pop(); - self.id_to_info.remove(&id) - }) + self.addr_to_id.remove(&addr).and_then(|id| { + let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos; + // swap with last index, then remove the last index + self.swap_random_id(random_id_pos, self.random_ids.len() - 1); + self.random_ids.pop(); + self.id_to_info.remove(&id) }) } /// Get an address information by ip and port pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> { - multiaddr_to_socketaddr(addr).and_then(|addr| { - self.addr_to_id - .get(&addr) - .and_then(|id| self.id_to_info.get(id)) - }) + self.addr_to_id + .get(&addr) + .and_then(|id| self.id_to_info.get(id)) } /// Get a mutable address information by ip and port pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> { - if let Some(addr) = multiaddr_to_socketaddr(addr) { - if let Some(id) = self.addr_to_id.get(&addr) { - self.id_to_info.get_mut(id) - } else { - None - } + if let Some(id) = self.addr_to_id.get(&addr) { + self.id_to_info.get_mut(id) } else { None } diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 3169cb45f79..a8078b88775 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,4 +1,3 @@ -use crate::network::{find_type, TransportType}; use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs index 2db34c306bb..ef089bdb633 100644 --- a/network/src/peer_store/types.rs +++ b/network/src/peer_store/types.rs @@ -66,6 +66,7 @@ impl AddrInfo { addr: addr .iter() .filter_map(|p| { + let p = p; if matches!( p, Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_) diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index 5a59d766845..a9ddccf2073 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -1,4 +1,5 @@ use crate::{ + network::TransportType, peer_store::{types::AddrInfo, PeerStore}, NetworkState, }; @@ -27,6 +28,7 @@ pub struct OutboundPeerService { interval: Option, try_connect_interval: Duration, try_identify_count: u8, + transport_type: TransportType, } impl OutboundPeerService { @@ -34,6 +36,7 @@ impl OutboundPeerService { network_state: Arc, p2p_control: ServiceControl, try_connect_interval: Duration, + _transport_type: TransportType, ) -> Self { OutboundPeerService { network_state, @@ -41,6 +44,10 @@ impl OutboundPeerService { interval: None, try_connect_interval, try_identify_count: 0, + #[cfg(not(target_family = "wasm"))] + transport_type: TransportType::Tcp, + #[cfg(target_family = "wasm")] + transport_type: _transport_type, } } @@ -63,8 +70,15 @@ impl OutboundPeerService { attempt_peers, ); - for addr in attempt_peers.into_iter().map(|info| info.addr) { - self.network_state.dial_feeler(&self.p2p_control, addr); + for mut addr in attempt_peers.into_iter().map(|info| info.addr) { + self.network_state.dial_feeler(&self.p2p_control, { + if !matches!(self.transport_type, TransportType::Tcp) { + addr.push(self.transport_type.into()); + addr + } else { + addr + } + }); } } @@ -132,8 +146,15 @@ impl OutboundPeerService { Box::new(attempt_peers.into_iter().map(|info| info.addr)) }; - for addr in peers { - self.network_state.dial_identify(&self.p2p_control, addr); + for mut addr in peers { + self.network_state.dial_identify(&self.p2p_control, { + if !matches!(self.transport_type, TransportType::Tcp) { + addr.push(self.transport_type.into()); + addr + } else { + addr + } + }); } } diff --git a/network/src/tests/mod.rs b/network/src/tests/mod.rs index dde66545f05..d40867f40f2 100644 --- a/network/src/tests/mod.rs +++ b/network/src/tests/mod.rs @@ -20,7 +20,7 @@ fn random_addr_v6() -> crate::multiaddr::Multiaddr { multi_addr.push(crate::multiaddr::Protocol::Tcp(43)); multi_addr.push(crate::multiaddr::Protocol::P2P( - crate::PeerId::random().to_base58().into_bytes().into(), + crate::PeerId::random().into_bytes().into(), )); multi_addr } diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index e23db5c4129..85881b278ee 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -603,3 +603,21 @@ fn test_only_tcp_store() { addr }); } + +#[test] +fn test_support_dns_store() { + let mut peer_store = PeerStore::default(); + let addr: Multiaddr = format!( + "/dns4/www.abc.com/tcp/{}/p2p/{}", + rand::random::(), + crate::PeerId::random().to_base58() + ) + .parse() + .unwrap(); + + peer_store + .add_addr(addr.clone(), Flags::COMPATIBILITY) + .unwrap(); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); + assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr); +} diff --git a/rpc/src/tests/setup.rs b/rpc/src/tests/setup.rs index 888186fe57a..4afea1fa6d9 100644 --- a/rpc/src/tests/setup.rs +++ b/rpc/src/tests/setup.rs @@ -9,7 +9,7 @@ use ckb_chain::start_chain_services; use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder}; use ckb_chain_spec::versionbits::{ActiveMode, Deployment, DeploymentPos}; use ckb_dao_utils::genesis_dao_data; -use ckb_network::{Flags, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkService, NetworkState}; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_notify::NotifyService; use ckb_shared::SharedBuilder; @@ -112,6 +112,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option) -> "0.1.0".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs index a11ceb1fe82..ed872ac550e 100644 --- a/sync/src/relayer/tests/helper.rs +++ b/sync/src/relayer/tests/helper.rs @@ -5,9 +5,9 @@ use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_dao::DaoCalculator; use ckb_dao_utils::genesis_dao_data; use ckb_network::{ - async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, Error, Flags, - NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, SupportProtocols, - TargetSession, + async_trait, bytes::Bytes as P2pBytes, network::TransportType, Behaviour, CKBProtocolContext, + Error, Flags, NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, + SupportProtocols, TargetSession, }; use ckb_reward_calculator::RewardCalculator; use ckb_shared::{Shared, SharedBuilder, Snapshot}; @@ -127,6 +127,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 87e87a2a2f5..8bf7105b162 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -15,8 +15,8 @@ use ckb_light_client_protocol_server::LightClientProtocol; use ckb_logger::info; use ckb_logger::internal::warn; use ckb_network::{ - observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkService, - NetworkState, SupportProtocols, + network::TransportType, observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, + NetworkService, NetworkState, SupportProtocols, }; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_resource::Resource; @@ -384,6 +384,7 @@ impl Launcher { self.version.to_string(), flags, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed"); diff --git a/util/light-client-protocol-server/src/tests/utils/chain.rs b/util/light-client-protocol-server/src/tests/utils/chain.rs index 03e37e704bf..94777dfa670 100644 --- a/util/light-client-protocol-server/src/tests/utils/chain.rs +++ b/util/light-client-protocol-server/src/tests/utils/chain.rs @@ -8,7 +8,7 @@ use ckb_chain::{start_chain_services, ChainController}; use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_systemtime::unix_time_as_millis; use ckb_test_chain_utils::always_success_cell; @@ -240,6 +240,7 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::all(), ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed")