Skip to content

Commit

Permalink
feat: add transport type to outbound service
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jan 15, 2025
1 parent 24cf5f0 commit 5e07ec2
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 108 deletions.
3 changes: 2 additions & 1 deletion benches/benches/benchmarks/overall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController};
use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::JsonBytes;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_types::{
Expand Down Expand Up @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
3 changes: 2 additions & 1 deletion chain/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig};
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_test_chain_utils::{always_success_cell, create_always_success_tx};
Expand Down Expand Up @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
87 changes: 59 additions & 28 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,20 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if is_reachable(socket_addr.ip()) => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
None => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
Some(_) => None,
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -158,15 +163,20 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if is_reachable(socket_addr.ip()) => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
None => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
Some(_) => None,
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -831,6 +841,7 @@ impl NetworkService {
required_protocol_ids: Vec<ProtocolId>,
// name, version, flags
identify_announce: (String, String, Flags),
transport_type: TransportType,
) -> Self {
let config = &network_state.config;

Expand Down Expand Up @@ -1017,7 +1028,7 @@ impl NetworkService {
service_builder = service_builder.tcp_config(bind_fn);
}
}
TransportType::Ws => {
TransportType::Ws | TransportType::Wss => {
// only bind once
if matches!(init, BindType::Ws) {
continue;
Expand Down Expand Up @@ -1074,6 +1085,7 @@ impl NetworkService {
Arc::clone(&network_state),
p2p_service.control().to_owned().into(),
Duration::from_secs(config.connect_outbound_interval_secs),
transport_type,
);
bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
};
Expand Down Expand Up @@ -1520,19 +1532,38 @@ pub(crate) async fn async_disconnect_with_message(
control.disconnect(peer_index).await
}

/// Transport type on ckb
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub(crate) enum TransportType {
pub enum TransportType {
/// Tcp
Tcp,
/// Ws
Ws,
/// Wss only on wasm
Wss,
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
if addr
.iter()
.any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss))
{
TransportType::Ws
} else {
TransportType::Tcp
impl<'a> From<TransportType> for p2p::multiaddr::Protocol<'a> {
fn from(value: TransportType) -> Self {
match value {
TransportType::Ws => Protocol::Ws,
TransportType::Wss => Protocol::Wss,
_ => unreachable!(),
}
}
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let Protocol::Ws = proto {
Some(TransportType::Ws)
} else if let Protocol::Wss = proto {
Some(TransportType::Wss)
} else {
None
}
})
.unwrap_or(TransportType::Tcp)
}
104 changes: 50 additions & 54 deletions network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,38 @@ use crate::peer_store::types::AddrInfo;
use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr};
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;

/// Address manager
#[derive(Default)]
pub struct AddrManager {
next_id: u64,
addr_to_id: HashMap<SocketAddr, u64>,
addr_to_id: HashMap<Multiaddr, u64>,
id_to_info: HashMap<u64, AddrInfo>,
random_ids: Vec<u64>,
}

impl AddrManager {
/// Add an address information to address manager
pub fn add(&mut self, mut addr_info: AddrInfo) {
if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) {
if let Some(&id) = self.addr_to_id.get(&key) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}
return;
if let Some(&id) = self.addr_to_id.get(&addr_info.addr) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}

let id = self.next_id;
self.addr_to_id.insert(key, id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
return;
}

let id = self.next_id;
self.addr_to_id.insert(addr_info.addr.clone(), id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
}

/// Randomly return addrs that worth to try or connect.
Expand All @@ -55,23 +52,30 @@ impl AddrManager {
let j = rng.gen_range(i..self.random_ids.len());
self.swap_random_id(j, i);
let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
match multiaddr_to_socketaddr(&addr_info.addr) {
Some(socket_addr) => {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
}
}
if addr_infos.len() == count {
break;
None => {
if addr_info.is_connectable(now_ms) && filter(&addr_info) {
addr_infos.push(addr_info);
}
}
}
if addr_infos.len() == count {
break;
}
}
addr_infos
}
Expand All @@ -88,34 +92,26 @@ impl AddrManager {

/// Remove an address by ip and port
pub fn remove(&mut self, addr: &Multiaddr) -> Option<AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id.remove(&addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
self.addr_to_id.remove(addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
}

/// Get an address information by ip and port
pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id
.get(&addr)
.and_then(|id| self.id_to_info.get(id))
})
self.addr_to_id
.get(addr)
.and_then(|id| self.id_to_info.get(id))
}

/// Get a mutable address information by ip and port
pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> {
if let Some(addr) = multiaddr_to_socketaddr(addr) {
if let Some(id) = self.addr_to_id.get(&addr) {
self.id_to_info.get_mut(id)
} else {
None
}
if let Some(id) = self.addr_to_id.get(addr) {
self.id_to_info.get_mut(id)
} else {
None
}
Expand Down
1 change: 0 additions & 1 deletion network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::network::{find_type, TransportType};
use crate::{
errors::{PeerStoreError, Result},
extract_peer_id, multiaddr_to_socketaddr,
Expand Down
8 changes: 4 additions & 4 deletions network/src/protocols/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ impl AddressManager for DiscoveryAddressManager {

fn is_valid_addr(&self, addr: &Multiaddr) -> bool {
if !self.discovery_local_address {
let local_or_invalid = multiaddr_to_socketaddr(addr)
.map(|socket_addr| !is_reachable(socket_addr.ip()))
.unwrap_or(true);
!local_or_invalid
match multiaddr_to_socketaddr(addr) {
Some(socket_addr) => is_reachable(socket_addr.ip()),
None => true,
}
} else {
true
}
Expand Down
7 changes: 3 additions & 4 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,9 @@ impl<T: Callback> IdentifyProtocol<T> {
let global_ip_only = self.global_ip_only;
let reachable_addrs = listens
.into_iter()
.filter(|addr| {
multiaddr_to_socketaddr(addr)
.map(|socket_addr| !global_ip_only || is_reachable(socket_addr.ip()))
.unwrap_or(false)
.filter(|addr| match multiaddr_to_socketaddr(addr) {
Some(socket_addr) => !global_ip_only || is_reachable(socket_addr.ip()),
None => true,
})
.collect::<Vec<_>>();
self.callback
Expand Down
Loading

0 comments on commit 5e07ec2

Please sign in to comment.