Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add proxy support for ckb #4733

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
647 changes: 536 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ members = [
"util/occupied-capacity/macros",
"util/fixed-hash/macros",
"util/logger-service",
"util/onion",
"util/runtime",
"util/stop-handler",
"util/metrics",
Expand Down
1 change: 1 addition & 0 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ckb_build_info::Version;
use ckb_launcher::Launcher;
use ckb_logger::info;
use ckb_logger::warn;
use ckb_network::multiaddr::Multiaddr;
use ckb_resource::{Resource, TemplateContext};

use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit};
Expand Down
6 changes: 3 additions & 3 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ serde_json = "1.0"
bloom-filters = "0.1"
ckb-spawn = { path = "../util/spawn", version = "= 0.121.0-pre" }
bitflags = "1.0"
p2p = { version = "0.6.2", package = "tentacle", default-features = false }
p2p = { git="https://github.com/eval-exec/tentacle.git", branch="exec/proxy-and-onion", package = "tentacle", default-features = false }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [
p2p = { git="https://github.com/eval-exec/tentacle.git", branch="exec/proxy-and-onion", package = "tentacle", default-features = false, features = [
"upnp",
"parking_lot",
"openssl-vendored",
Expand All @@ -48,7 +48,7 @@ p2p = { version = "0.6.2", package = "tentacle", default-features = false, featu
socket2 = "0.5"

[target.'cfg(target_family = "wasm")'.dependencies]
p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [
p2p = { git="https://github.com/eval-exec/tentacle.git", branch="exec/proxy-and-onion", package = "tentacle", default-features = false, features = [
"wasm-timer",
] }
idb = "0.6"
Expand Down
76 changes: 76 additions & 0 deletions network/src/address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use p2p::multiaddr::{MultiAddr, Protocol};

#[derive(Default, Clone, Debug)]
pub struct NetworkAddresses {
pub regular_addresses: Vec<MultiAddr>,

// onion addresses can't be solved by multiaddr_to_socketaddr or socketaddr_to_multiaddr
pub onion_addresses: Vec<MultiAddr>,
}

impl NetworkAddresses {
pub fn push(&mut self, address: MultiAddr) {
if address
.iter()
.any(|proto| matches!(proto, Protocol::Onion3(_)))
{
self.onion_addresses.push(address);
} else {
self.regular_addresses.push(address);
}
}

// contains
pub fn contains(&self, address: &MultiAddr) -> bool {
self.regular_addresses.contains(address) || self.onion_addresses.contains(address)
}

// len
pub fn len(&self) -> usize {
self.regular_addresses.len() + self.onion_addresses.len()
}
}

// implement iter() for NetworkAddresses, don't take ownership
impl<'a> IntoIterator for &'a NetworkAddresses {
type Item = &'a MultiAddr;
type IntoIter =
std::iter::Chain<std::slice::Iter<'a, MultiAddr>, std::slice::Iter<'a, MultiAddr>>;

fn into_iter(self) -> Self::IntoIter {
self.regular_addresses
.iter()
.chain(self.onion_addresses.iter())
}
}

// convert Vec<MultiAddr> to NetworkAddresses
impl From<Vec<MultiAddr>> for NetworkAddresses {
fn from(addresses: Vec<MultiAddr>) -> Self {
let mut regular_addresses = Vec::new();
let mut onion_addresses = Vec::new();
for address in addresses {
if address
.iter()
.any(|proto| matches!(proto, Protocol::Onion3(_)))
{
onion_addresses.push(address);
} else {
regular_addresses.push(address);
}
}
NetworkAddresses {
regular_addresses,
onion_addresses,
}
}
}

// convert NetworkAddresses to Vec<MultiAddr>
impl From<NetworkAddresses> for Vec<MultiAddr> {
fn from(addresses: NetworkAddresses) -> Self {
let mut result = addresses.regular_addresses;
result.extend(addresses.onion_addresses);
result
}
}
2 changes: 2 additions & 0 deletions network/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
Dial(String),
/// Peer store error
PeerStore(PeerStoreError),
/// Config error
Config(String),
}

/// Error from tentacle
Expand Down
2 changes: 2 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! And implemented several basic protocols: identify, discovery, ping, feeler, disconnect_message
//!

mod address;
mod behaviour;
/// compress module
pub mod compress;
Expand All @@ -16,6 +17,7 @@ mod peer;
pub mod peer_registry;
pub mod peer_store;
mod protocols;
mod proxy;
mod services;

#[cfg(test)]
Expand Down
47 changes: 39 additions & 8 deletions network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Global state struct and start function
use crate::address::NetworkAddresses;
use crate::errors::Error;
#[cfg(not(target_family = "wasm"))]
use crate::errors::P2PError;
Expand All @@ -19,7 +20,7 @@ use crate::services::{
dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
protocol_type_checker::ProtocolTypeCheckerService,
};
use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
use crate::{proxy, Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtocol};
use ckb_logger::{debug, error, info, trace, warn};
use ckb_spawn::Spawn;
Expand Down Expand Up @@ -73,7 +74,7 @@ pub struct NetworkState {
pub(crate) peer_registry: RwLock<PeerRegistry>,
pub(crate) peer_store: Mutex<PeerStore>,
/// Node listened addresses
pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
pub(crate) listened_addrs: RwLock<NetworkAddresses>,
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
/// Node public addresses,
/// includes manually public addrs and remote peer observed addrs
Expand Down Expand Up @@ -121,6 +122,11 @@ impl NetworkState {
let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
config.peer_store_path(),
));
info!("Loaded the peer store.");
if let Some(ref proxy_url) = config.proxy_config.proxy_url {
proxy::check_proxy_url(proxy_url).map_err(|reason| Error::Config(reason))?;
}

let bootnodes = config.bootnodes();

let peer_registry = PeerRegistry::new(
Expand All @@ -137,7 +143,7 @@ impl NetworkState {
peer_registry: RwLock::new(peer_registry),
dialing_addrs: RwLock::new(HashMap::default()),
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
listened_addrs: RwLock::new(NetworkAddresses::default()),
pending_observed_addrs: RwLock::new(HashSet::default()),
local_private_key,
local_peer_id,
Expand Down Expand Up @@ -344,6 +350,12 @@ impl NetworkState {
.collect()
}

/// After onion service created,
/// ckb use this method to add onion address to public_addr
pub fn add_public_addr(&self, addr: Multiaddr) {
self.public_addrs.write().insert(addr);
}

pub(crate) fn connection_status(&self) -> ConnectionStatus {
self.peer_registry.read().connection_status()
}
Expand All @@ -360,7 +372,7 @@ impl NetworkState {
None
}
})
.chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
.chain(listened_addrs.into_iter().map(|addr| (addr.to_owned(), 1)))
.map(|(addr, score)| (addr.to_string(), score))
.collect()
}
Expand Down Expand Up @@ -991,6 +1003,11 @@ impl NetworkService {
if init.is_ready() {
break;
}
let proxy_config_enable = config.proxy_config.proxy_url.is_some();
service_builder = service_builder
.tcp_proxy_config(config.proxy_config.proxy_url.clone())
.tcp_onion_config(config.onion_config.onion_server.clone());

match find_type(multi_addr) {
TransportType::Tcp => {
// only bind once
Expand All @@ -999,7 +1016,7 @@ impl NetworkService {
}
if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
let domain = socket2::Domain::for_address(addr);
let bind_fn = move |socket: p2p::service::TcpSocket| {
let bind_fn = move |socket: p2p::service::TcpSocket, context: p2p::service::TransformerContext| {
let socket_ref = socket2::SockRef::from(&socket);
#[cfg(all(
unix,
Expand All @@ -1009,7 +1026,9 @@ impl NetworkService {
socket_ref.set_reuse_port(true)?;
socket_ref.set_reuse_address(true)?;
if socket_ref.domain()? == domain {
socket_ref.bind(&addr.into())?;
if !(proxy_config_enable && matches!(context.state, p2p::service::SocketState::Dial)) {
socket_ref.bind(&addr.into())?;
}
}
Ok(socket)
};
Expand All @@ -1024,7 +1043,7 @@ impl NetworkService {
}
if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
let domain = socket2::Domain::for_address(addr);
let bind_fn = move |socket: p2p::service::TcpSocket| {
let bind_fn = move |socket: p2p::service::TcpSocket, context: p2p::service::TransformerContext| {
let socket_ref = socket2::SockRef::from(&socket);
#[cfg(all(
unix,
Expand All @@ -1034,7 +1053,14 @@ impl NetworkService {
socket_ref.set_reuse_port(true)?;
socket_ref.set_reuse_address(true)?;
if socket_ref.domain()? == domain {
socket_ref.bind(&addr.into())?;
if !(proxy_config_enable
&& matches!(
context.state,
p2p::service::SocketState::Dial
))
{
socket_ref.bind(&addr.into())?;
}
}
Ok(socket)
};
Expand Down Expand Up @@ -1309,6 +1335,11 @@ impl NetworkController {
self.network_state.add_node(&self.p2p_control, address)
}

/// Add a public_addr to NetworkState.public_addrs
pub fn add_public_addr(&self, public_addr: Multiaddr) {
self.network_state.add_public_addr(public_addr)
}

/// Disconnect session with peer id
pub fn remove_node(&self, peer_id: &PeerId) {
if let Some(session_id) = self
Expand Down
5 changes: 3 additions & 2 deletions network/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::address::NetworkAddresses;
use crate::network_group::Group;
use crate::{
multiaddr::Multiaddr, protocols::identify::Flags, ProtocolId, ProtocolVersion, SessionType,
Expand All @@ -21,7 +22,7 @@ pub struct Peer {
/// Peer address
pub connected_addr: Multiaddr,
/// Peer listen addresses
pub listened_addrs: Vec<Multiaddr>,
pub listened_addrs: NetworkAddresses,
/// Peer info from identify protocol message
pub identify_info: Option<PeerIdentifyInfo>,
/// Ping/Pong message last received time
Expand Down Expand Up @@ -54,7 +55,7 @@ impl Peer {
) -> Self {
Peer {
connected_addr,
listened_addrs: Vec::new(),
listened_addrs: NetworkAddresses::default(),
identify_info: None,
ping_rtt: None,
last_ping_protocol_message_received_at: None,
Expand Down
2 changes: 1 addition & 1 deletion network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl Callback for IdentifyCallback {
);
let flags = self.network_state.with_peer_registry_mut(|reg| {
if let Some(peer) = reg.get_peer_mut(session.id) {
peer.listened_addrs = addrs.clone();
peer.listened_addrs = addrs.clone().into();
peer.identify_info
.as_ref()
.map(|a| a.flags)
Expand Down
8 changes: 7 additions & 1 deletion network/src/protocols/identify/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ckb_logger::warn;
use p2p::{bytes::Bytes, multiaddr::Multiaddr};

use ckb_types::{packed, prelude::*};
Expand Down Expand Up @@ -69,7 +70,12 @@ impl<'a> IdentifyMessage<'a> {
Multiaddr::try_from(reader.observed_addr().bytes().raw_data().to_vec()).ok()?;
let mut listen_addrs = Vec::with_capacity(reader.listen_addrs().len());
for addr in reader.listen_addrs().iter() {
listen_addrs.push(Multiaddr::try_from(addr.bytes().raw_data().to_vec()).ok()?)
match Multiaddr::try_from(addr.bytes().raw_data().to_vec()) {
Ok(multi_addr) => {
listen_addrs.push(multi_addr);
}
Err(err) => warn!("failed to decode listen_addr to MultiAddr: {}", err),
}
}

Some(IdentifyMessage {
Expand Down
30 changes: 30 additions & 0 deletions network/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use ckb_app_config::Url;

pub(crate) fn check_proxy_url(proxy_url: &str) -> Result<(), String> {
let parsed_url = Url::parse(proxy_url).map_err(|e| e.to_string())?;
if parsed_url.host_str().is_none() {
return Err(format!("missing host in proxy url: {}", proxy_url));
}
let scheme = parsed_url.scheme();
if scheme.ne("socks5") {
return Err(format!("CKB doesn't support proxy scheme: {}", scheme));
}
Ok(())
}

#[test]
fn parse_socks5_url() {
let result = Url::parse("socks5://username:password@localhost:1080");
assert!(result.is_ok());
let parsed_url = result.unwrap();
dbg!(&parsed_url);
assert_eq!(parsed_url.scheme(), "socks5");
// username
assert_eq!(parsed_url.username(), "username");
// password
assert_eq!(parsed_url.password(), Some("password"));
// host
assert_eq!(parsed_url.host_str(), Some("localhost"));
// port
assert_eq!(parsed_url.port(), Some(1080));
}
9 changes: 7 additions & 2 deletions rpc/src/module/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use ckb_jsonrpc_types::{
BannedAddr, LocalNode, LocalNodeProtocol, NodeAddress, PeerSyncState, RemoteNode,
RemoteNodeProtocol, SyncState, Timestamp,
};
use ckb_network::{extract_peer_id, multiaddr::Multiaddr, NetworkController};
use ckb_network::{
extract_peer_id,
multiaddr::{MultiAddr, Multiaddr},
NetworkController,
};
use ckb_sync::SyncShared;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::prelude::{Pack, Unpack};
Expand Down Expand Up @@ -587,7 +591,8 @@ impl NetRpc for NetRpcImpl {
.iter()
.map(|(peer_index, peer)| {
let mut addresses = vec![&peer.connected_addr];
addresses.extend(peer.listened_addrs.iter());
let listened_addrs: Vec<MultiAddr> = peer.listened_addrs.clone().into();
addresses.extend(listened_addrs.iter());

let node_addresses = addresses
.iter()
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmBExtension),
Box::new(RandomlyKill),
Box::new(SyncChurn),
Box::new(TorService),
];
specs.shuffle(&mut thread_rng());
specs
Expand Down
Loading
Loading