diff --git a/network/src/ip_limit.rs b/network/src/ip_limit.rs new file mode 100644 index 0000000000..e6e0d2099c --- /dev/null +++ b/network/src/ip_limit.rs @@ -0,0 +1,148 @@ +// Copyright 2019 Conflux Foundation. All rights reserved. +// Conflux is free software and distributed under GNU General Public License. +// See http://www.gnu.org/licenses/ + +use crate::node_table::NodeId; +use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + net::IpAddr, +}; + +pub type SessionIpLimit = IpLimit; +pub type NodeIpLimit = IpLimit; + +/// IP address limitation for sessions or nodes. +pub struct IpLimit { + quota: usize, // 0 presents unlimited + ip_to_keys: HashMap>, +} + +impl IpLimit { + pub fn new(quota: usize) -> Self { + IpLimit { + quota, + ip_to_keys: HashMap::new(), + } + } + + #[inline] + pub fn get_quota(&self) -> usize { self.quota } + + #[inline] + pub fn is_enabled(&self) -> bool { self.quota > 0 } + + /// Check if the specified IP address is allowed. + pub fn is_ip_allowed(&self, ip: &IpAddr) -> bool { + if !self.is_enabled() { + return true; + } + + match self.ip_to_keys.get(ip) { + Some(keys) => keys.len() < self.quota, + None => true, + } + } + + /// Validate IP address when adding a new node. + pub fn on_add(&mut self, ip: IpAddr, key: KEY) -> bool { + if !self.is_enabled() { + return true; + } + + if !self.is_ip_allowed(&ip) { + return false; + } + + if let Some(keys) = self.ip_to_keys.get(&ip) { + if keys.contains(&key) { + return false; + } + } + + self.ip_to_keys + .entry(ip) + .or_insert_with(|| HashSet::new()) + .insert(key) + } + + /// Update the number of nodes for the specified IP address when deleting a + /// node. + pub fn on_delete(&mut self, ip: &IpAddr, key: &KEY) -> bool { + if !self.is_enabled() { + return true; + } + + let keys = match self.ip_to_keys.get_mut(ip) { + Some(keys) => keys, + None => return false, + }; + + if !keys.remove(key) { + return false; + } + + if keys.is_empty() { + self.ip_to_keys.remove(ip); + } + + true + } + + pub fn get_keys(&self, ip: &IpAddr) -> Option<&HashSet> { + self.ip_to_keys.get(ip) + } +} + +#[cfg(test)] +mod tests { + use super::SessionIpLimit; + use std::{net::IpAddr, str::FromStr}; + + fn new_ip(ip: &str) -> IpAddr { IpAddr::from_str(ip).unwrap() } + + #[test] + fn test_enabled() { + assert_eq!(SessionIpLimit::new(0).is_enabled(), false); + assert_eq!(SessionIpLimit::new(1).is_enabled(), true); + assert_eq!(SessionIpLimit::new(4).is_enabled(), true); + + let mut limit = SessionIpLimit::new(0); + let ip = new_ip("127.0.0.1"); + assert_eq!(limit.on_add(ip, 1), true); + assert_eq!(limit.on_add(ip, 2), true); + assert_eq!(limit.on_add(ip, 3), true); + } + + #[test] + fn test_on_add() { + let mut limit = SessionIpLimit::new(2); + let ip = new_ip("127.0.0.1"); + + assert_eq!(limit.is_ip_allowed(&ip), true); + assert_eq!(limit.on_add(ip, 1), true); + + assert_eq!(limit.is_ip_allowed(&ip), true); + assert_eq!(limit.on_add(ip, 1), false); // duplicated key + assert_eq!(limit.on_add(ip, 2), true); + + assert_eq!(limit.is_ip_allowed(&ip), false); + assert_eq!(limit.on_add(ip, 3), false); + } + + #[test] + fn test_on_delete() { + let mut limit = SessionIpLimit::new(2); + let ip = new_ip("127.0.0.1"); + + assert_eq!(limit.on_add(ip, 1), true); + assert_eq!(limit.on_add(ip, 2), true); + assert_eq!(limit.get_keys(&ip).unwrap().len(), 2); + + assert_eq!(limit.on_delete(&ip, &3), false); // invalid key + assert_eq!(limit.on_delete(&ip, &2), true); + assert_eq!(limit.get_keys(&ip).unwrap().len(), 1); + assert_eq!(limit.on_delete(&ip, &1), true); + assert_eq!(limit.get_keys(&ip), None); + } +} diff --git a/network/src/lib.rs b/network/src/lib.rs index a25e57848f..c13c1655af 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -39,6 +39,7 @@ pub type PeerId = usize; mod connection; mod discovery; mod error; +mod ip_limit; mod ip_utils; mod node_database; pub mod node_table; diff --git a/network/src/node_database.rs b/network/src/node_database.rs index 8d6800bdf2..22fa87d9e3 100644 --- a/network/src/node_database.rs +++ b/network/src/node_database.rs @@ -3,14 +3,83 @@ // See http://www.gnu.org/licenses/ use crate::{ + ip_limit::NodeIpLimit, node_table::{Node, NodeContact, NodeEntry, NodeId, NodeTable}, IpFilter, }; use io::StreamToken; -use std::{collections::HashMap, net::IpAddr, time::Duration}; +use std::{ + cmp::Ordering, + net::IpAddr, + time::{Duration, SystemTime}, +}; /// Node database maintains all P2P nodes in trusted and untrusted node tables, -/// and support to limit the number of nodes for the same IP address. +/// and supports to limit the number of nodes for the same IP address. +/// +/// # Insert a node +/// +/// There are 4 scenarios to insert a node into database: +/// 1. Receive the "hello" handshaking message from ingress TCP connection, +/// and add the node with `StreamToken` as untrusted if not exists in database. +/// Otherwise, overwrite the existing node in trusted or untrusted table, +/// including endpoint, last contact and connection information. +/// 2. Receive a "ping" message from UDP discovery, and add the node as +/// untrusted if not exists in database. Otherwise, overwrite the existing +/// node in trusted or untrusted table. +/// 3. Receive the "pong" message from UDP discovery, and add the node as +/// untrusted if not exists in database. Otherwise, overwrite the existing +/// node in trusted or untrusted table. +/// 4. RPC explicitly add a trusted node. If the node is an existing +/// untrusted one, promote it to trusted. +/// +/// # Update node information +/// +/// ## note_success +/// When actively connect to a sampled trusted node, updates the node last +/// contact time. +/// +/// ## note_failure +/// Mark the node's last contact to failure for any error, e.g. +/// - UDP request timeout. +/// - Failed to create TCP connection/session. +/// - Failed to communicate due to invalid protocol message. +/// +/// # Promote/Demote +/// +/// ## Promote +/// Periodically promote untrusted nodes from ingress TCP connection with +/// configured timespan. +/// +/// ## Demote +/// Demote a node to untrusted when failed to handle protocol messages. +/// +/// # IP limitation +/// +/// Attacker could easily simulate a large amount of malicious nodes of +/// different node IDs and same IP address. To avoid such kind of attack, user +/// could limits the number of nodes for one IP address. By default, only 1 node +/// allowed for a IP address, and has to replace an existing old node in +/// following 2 scenarios. +/// +/// ## Scenario 1: add new node with existing IP address +/// For example, "node_1" with "IP_1" already in database, and to add a new node +/// "node_2" with the same address "IP_1". Due to default IP limitation (1 node +/// per IP), "node_1" will be removed, and then add "node_2" with "IP_1". +/// +/// ## Scenario 2: update node with existing IP address +/// For example, "node_1" with "IP_1" and "node_2" with "IP_2" already in +/// database, and to update "node_2" with existing address "IP_1". Due to +/// default IP limitation (1 node per IP), "node_1" will be removed, and then +/// update "node_2" with "IP_1". Besides, "IP_2" never exists in database +/// anymore. +/// +/// ## Remove node with priority +/// If multiple nodes allowed for 1 IP address, the node with minimum priority +/// will be removed in above 2 scenarios. The priority is defined as below: +/// 1. untrusted node < trusted node +/// 2. Node last contact status: unknown < failure < success +/// 3. Node last contact time: the earlier the smaller pub struct NodeDatabase { trusted_nodes: NodeTable, untrusted_nodes: NodeTable, @@ -23,8 +92,8 @@ impl NodeDatabase { let untrusted_nodes = NodeTable::new(path.clone(), false); let mut ip_limit = NodeIpLimit::new(nodes_per_ip); - ip_limit.init(&trusted_nodes); - ip_limit.init(&untrusted_nodes); + NodeDatabase::init(&mut ip_limit, &trusted_nodes); + NodeDatabase::init(&mut ip_limit, &untrusted_nodes); NodeDatabase { trusted_nodes, @@ -33,120 +102,102 @@ impl NodeDatabase { } } - /// add or update a node with the specified `entry` and `stream_token`. + /// Add a new untrusted node if not exists. Otherwise, update the existing + /// node with specified `entry` and `stream_token`. pub fn insert_with_token( &mut self, entry: NodeEntry, stream_token: StreamToken, - ) -> InsertResult { - if self.trusted_nodes.contains(&entry.id) { - self.trusted_nodes.note_success( - &entry.id, - true, - Some(stream_token), - ); - return InsertResult::Updated; - } - - match self - .ip_limit - .validate_insertion(&self.untrusted_nodes, &entry) - { - result @ InsertResult::Added | result @ InsertResult::Updated => { - let mut node = Node::new(entry.id, entry.endpoint); - node.last_contact = Some(NodeContact::success()); - node.last_connected = Some(NodeContact::success()); - node.stream_token = Some(stream_token); - - // overwrite endpoint of untrusted node. - self.untrusted_nodes.add_node(node, false); - - result - } - result @ _ => result, + ) { + let mut node = Node::new(entry.id, entry.endpoint); + node.last_contact = Some(NodeContact::success()); + node.last_connected = Some(NodeContact::success()); + node.stream_token = Some(stream_token); + + let ip = node.endpoint.address.ip(); + + if let Some(old_node) = self.trusted_nodes.get(&node.id) { + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.trusted_nodes.add_node(node, false); + } else if let Some(old_node) = self.untrusted_nodes.get(&node.id) { + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.untrusted_nodes.add_node(node, false); + } else { + self.force_new_ip_limit(&node.id, ip); + self.untrusted_nodes.add_node(node, false); } } - /// Add or update a node with the specified `entry`. - pub fn insert(&mut self, entry: NodeEntry) -> InsertResult { - if self.trusted_nodes.contains(&entry.id) { - self.trusted_nodes.note_success(&entry.id, false, None); - return InsertResult::Updated; - } - - match self - .ip_limit - .validate_insertion(&self.untrusted_nodes, &entry) - { - result @ InsertResult::Added | result @ InsertResult::Updated => { - let mut node = Node::new(entry.id, entry.endpoint); - node.last_contact = Some(NodeContact::success()); - self.untrusted_nodes.update_last_contact(node); - result - } - result @ _ => result, + /// Add a new untrusted node if not exists. Otherwise, update the existing + /// node with the specified `entry`. If node exists, it will update its + /// last contact information. + pub fn insert(&mut self, entry: NodeEntry) { + let mut node = Node::new(entry.id, entry.endpoint); + node.last_contact = Some(NodeContact::success()); + + let ip = node.endpoint.address.ip(); + + if let Some(old_node) = self.trusted_nodes.get(&node.id) { + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.trusted_nodes.update_last_contact(node); + } else if let Some(old_node) = self.untrusted_nodes.get(&node.id) { + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.untrusted_nodes.update_last_contact(node); + } else { + self.force_new_ip_limit(&node.id, ip); + self.untrusted_nodes.add_node(node, false); } } - /// Add or update a node with the specified `entry`, and promote the node to - /// trusted if it is untrusted. - pub fn insert_with_promotion(&mut self, entry: NodeEntry) -> InsertResult { - if self.trusted_nodes.contains(&entry.id) { - self.trusted_nodes.note_success(&entry.id, false, None); - return InsertResult::Updated; - } - - match self - .ip_limit - .validate_insertion(&self.untrusted_nodes, &entry) + /// Add a new trusted node if not exists. Otherwise, update the existing + /// node with the specified `entry`, and promote the node to trusted if it + /// is untrusted. + pub fn insert_with_promotion(&mut self, entry: NodeEntry) { + let mut node = Node::new(entry.id, entry.endpoint); + node.last_contact = Some(NodeContact::success()); + + let ip = node.endpoint.address.ip(); + + if let Some(old_node) = self.trusted_nodes.get(&node.id) { + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.trusted_nodes.update_last_contact(node); + } else if let Some(old_node) = + self.untrusted_nodes.remove_with_id(&node.id) { - result @ InsertResult::Added | result @ InsertResult::Updated => { - let mut node = Node::new(entry.id, entry.endpoint); - node.last_contact = Some(NodeContact::success()); - - if let Some(old_node) = - self.untrusted_nodes.remove_with_id(&entry.id) - { - node.last_connected = old_node.last_connected; - node.stream_token = old_node.stream_token; - } - - self.trusted_nodes.add_node(node, false); - - result - } - result @ _ => result, + node.last_connected = old_node.last_connected; + node.stream_token = old_node.stream_token; + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.trusted_nodes.add_node(node, false); + } else { + self.force_new_ip_limit(&node.id, ip); + self.trusted_nodes.add_node(node, false); } } - /// Add a new trusted node if not exists, or promote an existing untrusted + /// Add a new trusted node if not exists, or promote the existing untrusted /// node. - pub fn insert_trusted(&mut self, entry: NodeEntry) -> Option { + pub fn insert_trusted(&mut self, entry: NodeEntry) { if self.trusted_nodes.contains(&entry.id) { - return None; + return; } let mut node = Node::new(entry.id.clone(), entry.endpoint.clone()); - - match self - .ip_limit - .validate_insertion(&self.untrusted_nodes, &entry) - { - InsertResult::Added => { - self.trusted_nodes.add_node(node, false); - Some(InsertResult::Added) - } - InsertResult::Updated => { - if let Some(old_node) = - self.untrusted_nodes.remove_with_id(&entry.id) - { - node.last_contact = old_node.last_contact; - node.last_connected = old_node.last_connected; - node.stream_token = old_node.stream_token; - } - - self.trusted_nodes.add_node(node, false); - Some(InsertResult::Updated) - } - result @ _ => Some(result), + let ip = node.endpoint.address.ip(); + + if let Some(old_node) = self.untrusted_nodes.remove_with_id(&node.id) { + node.last_contact = old_node.last_contact; + node.last_connected = old_node.last_connected; + node.stream_token = old_node.stream_token; + let old_ip = old_node.endpoint.address.ip(); + self.update_ip_limit(&node.id, old_ip, ip); + self.trusted_nodes.add_node(node, false); + } else { + self.force_new_ip_limit(&node.id, ip); + self.trusted_nodes.add_node(node, false); } } @@ -227,6 +278,7 @@ impl NodeDatabase { } } + /// Demote the specified node to untrusted if it is trusted. pub fn demote(&mut self, node_id: &NodeId) { if let Some(removed_trusted_node) = self.trusted_nodes.remove_with_id(node_id) @@ -245,155 +297,145 @@ impl NodeDatabase { match node { None => None, Some(n) => { - self.ip_limit.on_delete(n.endpoint.address.ip()); + assert!(self.ip_limit.on_delete(&n.endpoint.address.ip(), id)); Some(n) } } } -} -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum InsertResult { - Added, - Updated, - // the number of nodes reaches the maximum value for one IP address. - IpLimited, -} - -/// IP address limitation for P2P nodes. -pub struct NodeIpLimit { - nodes_per_ip: usize, // 0 presents unlimited - ip_to_nodes: HashMap, -} - -impl NodeIpLimit { - pub fn new(nodes_per_ip: usize) -> Self { - debug!("NodeIpLimit::new: nodes_per_ip = {}", nodes_per_ip); - NodeIpLimit { - nodes_per_ip, - ip_to_nodes: HashMap::new(), - } - } - - /// Initialize with give node table, and will not restrict the number - /// of nodes per IP address. - fn init(&mut self, table: &NodeTable) { - if !self.is_enabled() { + fn init(ip_limit: &mut NodeIpLimit, table: &NodeTable) { + if !ip_limit.is_enabled() { return; } table.visit(|id| { - if let Some(node) = table.get(id) { - let ip = node.endpoint.address.ip(); - let num = self.ip_to_nodes.entry(ip).or_insert(0); - *num += 1; + let node = table.get(id).expect("Node should exist during visit"); + let ip = node.endpoint.address.ip(); - if *num > self.nodes_per_ip { - warn!("NodeIpLimit::init: too many nodes added, actual = {}, limited = {}", *num, self.nodes_per_ip); - } - } else { - error!("NodeIpLimit::init: node not found when visit table"); + if !ip_limit.on_add(ip, node.id) { + warn!("node not added into database, ip = {:?}, id = {:?}, quota = {}", ip, node.id, ip_limit.get_quota()); } true }); } - fn is_enabled(&self) -> bool { self.nodes_per_ip > 0 } - - /// Check if the specified IP address is allowed. - pub fn is_ip_allowed(&self, ip: &IpAddr) -> bool { - if !self.is_enabled() { - return true; - } - - match self.ip_to_nodes.get(ip) { - Some(num) => *num < self.nodes_per_ip, - None => true, + /// Update the IP-NodeId mapping before update a node if its IP address + /// changed. + fn update_ip_limit( + &mut self, node_id: &NodeId, old_ip: IpAddr, new_ip: IpAddr, + ) { + if old_ip != new_ip { + self.force_new_ip_limit(node_id, new_ip); + assert!(self.ip_limit.on_delete(&old_ip, node_id)); } } - /// Validate IP address when adding a new node. - pub fn on_add(&mut self, ip: IpAddr) -> bool { - if !self.is_enabled() { - return true; - } - - let num_nodes = self.ip_to_nodes.entry(ip).or_insert(0); - if *num_nodes < self.nodes_per_ip { - *num_nodes += 1; - true - } else { - false + /// Add a new IP-NodeId mapping. If IP limitation reached, remove the worst + /// node of the same IP address. + fn force_new_ip_limit(&mut self, node_id: &NodeId, ip: IpAddr) { + if !self.ip_limit.on_add(ip, node_id.clone()) { + assert_eq!(self.remove_worst_by_ip(ip).is_some(), true); + assert_eq!(self.ip_limit.on_add(ip, node_id.clone()), true); } } - /// Validate IP address when updating an existing node. - fn on_update(&mut self, old_ip: IpAddr, new_ip: IpAddr) -> bool { - if !self.is_enabled() { - return true; - } - - if old_ip == new_ip { - return true; - } + /// Remove the worst node of specified IP address. The worst node has the + /// minimum priority. + fn remove_worst_by_ip(&mut self, ip: IpAddr) -> Option { + let mut min_priority = NodePriority::MAX; + let mut min_node = None; + + for id in self.ip_limit.get_keys(&ip)? { + let mut cur_node = None; + let cur_priority = if let Some(node) = self.untrusted_nodes.get(id) + { + cur_node = Some(node); + NodePriority::new(false, node) + } else if let Some(node) = self.trusted_nodes.get(id) { + cur_node = Some(node); + NodePriority::new(true, node) + } else { + NodePriority::MAX + }; - if !self.on_add(new_ip) { - return false; + if cur_priority < min_priority { + min_priority = cur_priority; + min_node = cur_node; + } } - self.on_delete(old_ip); - - true + let node_id = min_node?.id.clone(); + self.remove(&node_id) } +} - /// Update the number of nodes for the specified IP address when deleting a - /// node. - pub fn on_delete(&mut self, ip: IpAddr) { - if !self.is_enabled() { - return; - } +/// NodePriority defines the priority to remove when IP limitation reached. The +/// concrete definitions are as following: +/// - untrusted (0) < trusted (10) +/// - last_contact: unknown (0) < failure (1) < success (2) +/// - contact time +/// +/// Node with minimum priority will be removed when IP limitation reached. +struct NodePriority { + priority: usize, + contact_time: SystemTime, +} - if let Some(num) = self.ip_to_nodes.get_mut(&ip) { - if *num <= 1 { - self.ip_to_nodes.remove(&ip); - } else { - *num -= 1; +impl NodePriority { + const MAX: NodePriority = NodePriority { + priority: 100, + contact_time: SystemTime::UNIX_EPOCH, + }; + + fn new(trusted: bool, node: &Node) -> Self { + let mut priority = if trusted { 10 } else { 0 }; + let mut contact_time = SystemTime::UNIX_EPOCH; + + if let Some(contact) = node.last_contact { + match contact { + NodeContact::Failure(t) => { + priority += 1; + contact_time = t; + } + NodeContact::Success(t) => { + priority += 2; + contact_time = t; + } } - } else { - error!("NodeIpLimit::on_delete: ip not found"); } - } - fn validate_insertion( - &mut self, table: &NodeTable, entry: &NodeEntry, - ) -> InsertResult { - let new_ip = entry.endpoint.address.ip(); + NodePriority { + priority, + contact_time, + } + } +} - match table.get(&entry.id) { - Some(old_node) => { - let old_ip = old_node.endpoint.address.ip(); +impl PartialOrd for NodePriority { + fn partial_cmp(&self, other: &NodePriority) -> Option { + Some( + self.priority + .cmp(&other.priority) + .then_with(|| self.contact_time.cmp(&other.contact_time)), + ) + } +} - if !self.on_update(old_ip, new_ip) { - InsertResult::IpLimited - } else { - InsertResult::Updated - } - } - None => { - if !self.on_add(new_ip) { - InsertResult::IpLimited - } else { - InsertResult::Added - } - } - } +impl PartialEq for NodePriority { + fn eq(&self, other: &NodePriority) -> bool { + self.priority == other.priority + && self.contact_time == other.contact_time } } #[cfg(test)] mod tests { - use crate::node_table::{Node, NodeEndpoint, NodeEntry, NodeId}; + use super::NodeDatabase; + use crate::{ + ip_limit::NodeIpLimit, + node_table::{Node, NodeEndpoint, NodeEntry, NodeId, NodeTable}, + }; use std::{net::IpAddr, str::FromStr}; fn new_node(addr: &str) -> Node { @@ -402,276 +444,181 @@ mod tests { fn new_ip(ip: &str) -> IpAddr { IpAddr::from_str(ip).unwrap() } - fn new_entry(id: Option, addr: &str) -> NodeEntry { - let id = id.or_else(|| Some(NodeId::random())).unwrap(); - let endpoint = NodeEndpoint::from_str(addr).unwrap(); - NodeEntry { id, endpoint } - } - - #[cfg(test)] - mod ip_limit_tests { - use super::{ - super::{InsertResult, NodeIpLimit}, - new_entry, new_ip, new_node, - }; - use crate::node_table::NodeTable; - - #[test] - fn test_enabled() { - assert_eq!(NodeIpLimit::new(0).is_enabled(), false); - assert_eq!(NodeIpLimit::new(1).is_enabled(), true); - assert_eq!(NodeIpLimit::new(4).is_enabled(), true); + fn new_entry(addr: &str) -> NodeEntry { + NodeEntry { + id: NodeId::random(), + endpoint: NodeEndpoint::from_str(addr).unwrap(), } + } - #[test] - fn test_init() { - let mut table = NodeTable::new(None, true); - table.add_node(new_node("127.0.0.1:777"), false); - table.add_node(new_node("127.0.0.1:888"), false); - table.add_node(new_node("192.168.0.100:777"), false); - - // not enabled - let mut limit = NodeIpLimit::new(0); - limit.init(&table); - assert_eq!(limit.ip_to_nodes.len(), 0); - - // enabled - let mut limit = NodeIpLimit::new(1); - limit.init(&table); - assert_eq!(limit.ip_to_nodes.len(), 2); - assert_eq!(limit.ip_to_nodes[&new_ip("127.0.0.1")], 2); - assert_eq!(limit.ip_to_nodes[&new_ip("192.168.0.100")], 1); - } + #[test] + fn test_insert_with_token_added() { + let mut db = NodeDatabase::new(None, 1); - #[test] - fn test_on_add() { - let mut limit = NodeIpLimit::new(2); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), true); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), true); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), false); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), false); - } + let entry = new_entry("127.0.0.1:999"); + db.insert_with_token(entry.clone(), 5); - #[test] - fn test_on_update() { - let mut limit = NodeIpLimit::new(1); - let ip1 = new_ip("127.0.0.1"); - let ip2 = new_ip("127.0.0.2"); - assert_eq!(limit.on_add(ip1), true); - assert_eq!(limit.on_add(ip2), true); - - // same ip allowed - assert_eq!(limit.on_update(ip1, ip1), true); - // exist ip not allowed - assert_eq!(limit.on_update(ip1, ip2), false); - - // new ip allowed - let ip3 = new_ip("127.0.0.3"); - assert_eq!(limit.on_update(ip1, ip3), true); - assert_eq!(limit.ip_to_nodes.contains_key(&ip1), false); - assert_eq!(limit.ip_to_nodes.contains_key(&ip3), true); - } + assert_eq!(db.get(&entry.id, true), None); + assert_eq!(db.get(&entry.id, false).unwrap().stream_token, Some(5)); + } - #[test] - fn test_on_delete() { - let mut limit = NodeIpLimit::new(2); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), true); - assert_eq!(limit.on_add(new_ip("127.0.0.1")), true); - assert_eq!(limit.ip_to_nodes[&new_ip("127.0.0.1")], 2); - limit.on_delete(new_ip("127.0.0.1")); - assert_eq!(limit.ip_to_nodes[&new_ip("127.0.0.1")], 1); - limit.on_delete(new_ip("127.0.0.1")); - assert_eq!( - limit.ip_to_nodes.contains_key(&new_ip("127.0.0.1")), - false - ); - } + #[test] + fn test_insert_with_token_added_ip_exists() { + let mut db = NodeDatabase::new(None, 1); + + // add a node + let entry1 = new_entry("127.0.0.1:999"); + let ip = new_ip("127.0.0.1"); + db.insert_with_token(entry1.clone(), 5); + assert_eq!(db.ip_limit.get_keys(&ip).unwrap().len(), 1); + + // add new node with old IP address, previous node will be replaced. + let entry2 = new_entry("127.0.0.1:999"); + db.insert_with_token(entry2.clone(), 9); + assert_eq!(db.get(&entry1.id, false), None); // old node was repalced + let node = db.get(&entry2.id, false).unwrap(); + assert_eq!(node.endpoint, entry2.endpoint); + assert_eq!(node.stream_token, Some(9)); + assert_eq!(db.ip_limit.get_keys(&ip).unwrap().len(), 1); + } - #[test] - fn test_validate_insertion() { - let mut table = NodeTable::new(None, true); - let node = new_node("127.0.0.1:777"); - table.add_node(node.clone(), false); - - let mut limit = NodeIpLimit::new(1); - limit.init(&table); - - // new node id of same ip - let entry = new_entry(None, "127.0.0.1:999"); - assert_eq!( - limit.validate_insertion(&table, &entry), - InsertResult::IpLimited - ); - - // new node id of new ip - let entry = new_entry(None, "127.0.0.2:999"); - assert_eq!( - limit.validate_insertion(&table, &entry), - InsertResult::Added - ); - - // same node id of same ip - let entry = new_entry(Some(node.id.clone()), "127.0.0.1:777"); - assert_eq!( - limit.validate_insertion(&table, &entry), - InsertResult::Updated - ); - - // same node id of exist ip - let entry = new_entry(Some(node.id.clone()), "127.0.0.2:777"); - assert_eq!( - limit.validate_insertion(&table, &entry), - InsertResult::IpLimited - ); - - // same node id of new ip - let entry = new_entry(Some(node.id.clone()), "127.0.0.3:777"); - assert_eq!( - limit.validate_insertion(&table, &entry), - InsertResult::Updated - ); - } + #[test] + fn test_insert_with_token_updated_trusted() { + let mut db = NodeDatabase::new(None, 1); + + // add trusted node, whose token is None + let entry = new_entry("127.0.0.1:999"); + db.insert_trusted(entry.clone()); + assert_eq!(db.get(&entry.id, true).unwrap().stream_token, None); + + // update node with token 3 + db.insert_with_token(entry.clone(), 3); + assert_eq!(db.get(&entry.id, true).unwrap().stream_token, Some(3)); } - #[cfg(test)] - mod node_database_tests { - use super::{ - super::{InsertResult, NodeDatabase}, - new_entry, - }; - use crate::node_table::NodeId; - - #[test] - fn test_insert_with_token() { - let mut db = NodeDatabase::new(None, 1); - - // add a trusted node - let entry = new_entry(None, "127.0.0.1:999"); - assert_eq!( - db.insert_trusted(entry.clone()), - Some(InsertResult::Added) - ); - - // update trusted node - assert_eq!( - db.insert_with_token(entry.clone(), 3), - InsertResult::Updated - ); - let node = db.get(&entry.id, true); - assert_eq!(node.is_some(), true); - assert_eq!(node.unwrap().stream_token, Some(3)); - - // add untrusted node - let entry = new_entry(None, "127.0.0.2:999"); - assert_eq!( - db.insert_with_token(entry.clone(), 5), - InsertResult::Added - ); - assert_eq!(db.get(&entry.id, true), None); - assert_eq!(db.get(&entry.id, false).unwrap().stream_token, Some(5)); - - // update untrusted node, change endpoint and stream token - let entry = new_entry(Some(entry.id), "127.0.0.2:888"); - assert_eq!( - db.insert_with_token(entry.clone(), 6), - InsertResult::Updated - ); - assert_eq!(db.get(&entry.id, true), None); - let node = db.get(&entry.id, false).unwrap(); - assert_eq!(node.endpoint, entry.endpoint); // endpoint updated - assert_eq!(node.stream_token, Some(6)); // stream token updated - } + #[test] + fn test_insert_with_token_updated_untrusted() { + let mut db = NodeDatabase::new(None, 1); - #[test] - fn test_insert_with_promotion() { - let mut db = NodeDatabase::new(None, 1); - - // add untrusted node - let entry = new_entry(None, "127.0.0.1:999"); - assert_eq!(db.insert(entry.clone()), InsertResult::Added); - assert_eq!(db.get(&entry.id, true), None); - assert_eq!(db.get(&entry.id, false).is_some(), true); - - // update node and promote - assert_eq!( - db.insert_with_promotion(entry.clone()), - InsertResult::Updated - ); - assert_eq!(db.get(&entry.id, true).is_some(), true); - } + let entry = new_entry("127.0.0.1:999"); + db.insert_with_token(entry.clone(), 5); - #[test] - fn test_insert_trusted() { - let mut db = NodeDatabase::new(None, 1); - - // new added - let entry = new_entry(None, "127.0.0.1:999"); - assert_eq!( - db.insert_trusted(entry.clone()), - Some(InsertResult::Added) - ); - assert_eq!(db.get(&entry.id, true).is_some(), true); - - // already exists - assert_eq!(db.insert_trusted(entry.clone()), None); - - // prepare untrusted node to promote - let entry = new_entry(None, "127.0.0.2:999"); - assert_eq!(db.insert(entry.clone()), InsertResult::Added); - assert_eq!(db.get(&entry.id, true), None); - assert_eq!(db.get(&entry.id, false).is_some(), true); - - // add trusted node to promote - assert_eq!( - db.insert_trusted(entry.clone()), - Some(InsertResult::Updated) - ); - assert_eq!(db.get(&entry.id, true).is_some(), true); - } + // update node with new token + db.insert_with_token(entry.clone(), 8); + assert_eq!(db.get(&entry.id, true), None); + assert_eq!(db.get(&entry.id, false).unwrap().stream_token, Some(8)); + } - #[test] - fn test_remove() { - let mut db = NodeDatabase::new(None, 1); + #[test] + fn test_insert_with_token_updated_new_ip() { + let mut db = NodeDatabase::new(None, 1); - // add trusted node - let entry1 = new_entry(None, "127.0.0.1:999"); - assert_eq!( - db.insert_trusted(entry1.clone()), - Some(InsertResult::Added) - ); + let entry1 = new_entry("127.0.0.1:999"); + db.insert_with_token(entry1.clone(), 5); - // add untrusted node - let entry2 = new_entry(None, "127.0.0.2:999"); - assert_eq!( - db.insert_with_token(entry2.clone(), 9), - InsertResult::Added - ); + // update node with new ip and token + let mut entry2 = new_entry("127.0.0.2:999"); + entry2.id = entry1.id; + db.insert_with_token(entry2.clone(), 8); + let node = db.get(&entry1.id, false).unwrap(); + assert_eq!(node.endpoint, entry2.endpoint); + assert_eq!(node.stream_token, Some(8)); + } - assert_eq!(db.ip_limit.ip_to_nodes.len(), 2); + #[test] + fn test_insert_with_token_updated_ip_exists() { + let mut db = NodeDatabase::new(None, 1); + + // add node1 + let entry1 = new_entry("127.0.0.1:999"); + let ip1 = new_ip("127.0.0.1"); + db.insert_with_token(entry1.clone(), 3); + assert_eq!(db.ip_limit.get_keys(&ip1).unwrap().len(), 1); + + // add node2 + let entry2 = new_entry("127.0.0.2:999"); + let ip2 = new_ip("127.0.0.2"); + db.insert_with_token(entry2.clone(), 4); + assert_eq!(db.ip_limit.get_keys(&ip2).unwrap().len(), 1); + + // update node2's IP address to ip1, node1 will be removed + let mut entry = new_entry("127.0.0.1:999"); + entry.id = entry2.id.clone(); + db.insert_with_token(entry.clone(), 5); + + // node1 removed + assert_eq!(db.get(&entry1.id, false), None); + + // node2 updated + let node = db.get(&entry.id, false).unwrap(); + assert_eq!(node.endpoint, entry.endpoint); + assert_eq!(node.stream_token, Some(5)); + + // check ip_limits + assert_eq!(db.ip_limit.get_keys(&ip1).unwrap().len(), 1); + assert_eq!(db.ip_limit.get_keys(&ip2), None); + } - // delete nodes - assert_eq!(db.remove(&NodeId::random()), None); - assert_eq!(db.remove(&entry1.id).is_some(), true); - assert_eq!(db.remove(&entry2.id).is_some(), true); + #[test] + fn test_demote() { + let mut db = NodeDatabase::new(None, 1); - assert_eq!(db.ip_limit.ip_to_nodes.len(), 0); - } + // add a trusted node + let entry = new_entry("127.0.0.1:999"); + db.insert_trusted(entry.clone()); + assert_eq!(db.get(&entry.id, true).is_some(), true); - #[test] - fn test_demote() { - let mut db = NodeDatabase::new(None, 1); - - // add a trusted node - let entry = new_entry(None, "127.0.0.1:999"); - assert_eq!( - db.insert_trusted(entry.clone()), - Some(InsertResult::Added) - ); - - // demote the trusted node to untrusted - db.demote(&entry.id); - assert_eq!(db.get(&entry.id, true), None); - assert!(db.get(&entry.id, false).is_some()); - } + // demote the trusted node to untrusted + db.demote(&entry.id); + assert_eq!(db.get(&entry.id, true), None); + assert!(db.get(&entry.id, false).is_some()); + } + + #[test] + fn test_remove() { + let mut db = NodeDatabase::new(None, 1); + + // add trusted node + let entry1 = new_entry("127.0.0.1:999"); + db.insert_trusted(entry1.clone()); + assert_eq!(db.get(&entry1.id, true).is_some(), true); + + // add untrusted node + let entry2 = new_entry("127.0.0.2:999"); + db.insert_with_token(entry2.clone(), 9); + assert_eq!(db.get(&entry2.id, false).is_some(), true); + + // delete nodes + assert_eq!(db.remove(&NodeId::random()), None); + assert_eq!(db.remove(&entry1.id).unwrap().endpoint, entry1.endpoint); + assert_eq!(db.remove(&entry2.id).unwrap().endpoint, entry2.endpoint); + + assert_eq!(db.get(&entry1.id, true), None); + assert_eq!(db.get(&entry2.id, false), None); + } + + #[test] + fn test_init() { + let mut table = NodeTable::new(None, true); + table.add_node(new_node("127.0.0.1:777"), false); + table.add_node(new_node("127.0.0.1:888"), false); + table.add_node(new_node("192.168.0.100:777"), false); + + // not enabled + let mut limit = NodeIpLimit::new(0); + NodeDatabase::init(&mut limit, &table); + assert_eq!(limit.get_keys(&new_ip("127.0.0.1")), None); + assert_eq!(limit.get_keys(&new_ip("192.168.0.100")), None); + + // enabled with enough quota + let mut limit = NodeIpLimit::new(2); + NodeDatabase::init(&mut limit, &table); + assert_eq!(limit.get_keys(&new_ip("127.0.0.1")).unwrap().len(), 2); + assert_eq!(limit.get_keys(&new_ip("192.168.0.100")).unwrap().len(), 1); + + // enabled with less quota + let mut limit = NodeIpLimit::new(1); + NodeDatabase::init(&mut limit, &table); + assert_eq!(limit.get_keys(&new_ip("127.0.0.1")).unwrap().len(), 1); } } diff --git a/network/src/node_table.rs b/network/src/node_table.rs index 7ec0c54b19..c4ffde949b 100644 --- a/network/src/node_table.rs +++ b/network/src/node_table.rs @@ -454,7 +454,8 @@ impl NodeTable { node_ids } - // If node exists, update last contact, insert otherwise + // If node exists, update last contact, insert otherwise. + // Endpoint will be updated if node exists. pub fn update_last_contact(&mut self, node: Node) { let mut _index = NodeReputationIndex::default(); let mut exist = false; @@ -472,12 +473,14 @@ impl NodeTable { // check whether the node position will change if target_node_rep == _index.0 { - self.node_reputation_table[_index.0][_index.1].last_contact = - node.last_contact; + let old_node = &mut self.node_reputation_table[_index.0][_index.1]; + old_node.last_contact = node.last_contact; + old_node.endpoint = node.endpoint; } else { let mut removed_node = self.remove_from_reputation_level(&_index).unwrap(); removed_node.last_contact = node.last_contact; + removed_node.endpoint = node.endpoint; self.add_to_reputation_level(target_node_rep, removed_node); } } diff --git a/network/src/session.rs b/network/src/session.rs index f769126836..2894db7b0c 100644 --- a/network/src/session.rs +++ b/network/src/session.rs @@ -8,7 +8,6 @@ use crate::{ SendQueueStatus, MAX_PAYLOAD_SIZE, }, hash::keccak, - node_database::InsertResult, node_table::{NodeEndpoint, NodeEntry, NodeId}, service::NetworkServiceInner, Capability, DisconnectReason, Error, ErrorKind, ProtocolId, @@ -325,11 +324,7 @@ impl Session { return Err(self.disconnect(io, DisconnectReason::IpLimited)); } else { debug!("Received valid endpoint {:?}, session = {:?}", entry, self); - if host.node_db.write().insert_with_token(entry, self.token()) - == InsertResult::IpLimited - { - return Err(self.disconnect(io, DisconnectReason::IpLimited)); - } + host.node_db.write().insert_with_token(entry, self.token()); } self.send_ping(io)?; diff --git a/network/src/session_manager.rs b/network/src/session_manager.rs index 3c34a8e1a3..1e98abb2b4 100644 --- a/network/src/session_manager.rs +++ b/network/src/session_manager.rs @@ -3,8 +3,8 @@ // See http://www.gnu.org/licenses/ use crate::{ - node_database::NodeIpLimit, node_table::NodeId, - service::NetworkServiceInner, session::Session, NetworkIoMessage, + ip_limit::SessionIpLimit, node_table::NodeId, service::NetworkServiceInner, + session::Session, NetworkIoMessage, }; use io::IoContext; use mio::net::TcpStream; @@ -18,7 +18,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc}; pub struct SessionManager { sessions: RwLock>, usize>>, node_id_index: RwLock>, - ip_limit: RwLock, + ip_limit: RwLock, } impl SessionManager { @@ -26,7 +26,7 @@ impl SessionManager { SessionManager { sessions: RwLock::new(Slab::new_starting_at(offset, capacity)), node_id_index: RwLock::new(HashMap::new()), - ip_limit: RwLock::new(NodeIpLimit::new(nodes_per_ip)), + ip_limit: RwLock::new(SessionIpLimit::new(nodes_per_ip)), } } @@ -128,7 +128,7 @@ impl SessionManager { node_id_index.insert(node_id.clone(), index); } - ip_limit.on_add(ip); + assert!(ip_limit.on_add(ip, index)); debug!("SessionManager.create: leave"); @@ -145,7 +145,10 @@ impl SessionManager { self.node_id_index.write().remove(node_id); } - self.ip_limit.write().on_delete(session.address().ip()); + assert!(self + .ip_limit + .write() + .on_delete(&session.address().ip(), &session.token())); debug!("SessionManager.remove: session removed"); } diff --git a/test/peer_test.py b/test/peer_test.py index 4846d486d2..65335f4dc7 100755 --- a/test/peer_test.py +++ b/test/peer_test.py @@ -1,80 +1,40 @@ #!/usr/bin/env python3 import datetime -import time -from rlp.sedes import Binary, BigEndianInt -from conflux import utils -from conflux.utils import encode_hex, bytes_to_int, get_nodeid -from test_framework.blocktools import create_block from test_framework.test_framework import ConfluxTestFramework -from test_framework.mininode import * -from test_framework.test_node import TestNode -from test_framework.util import * - -class IpLimitedNode(P2PInterface): - disconnect_reason = None - - def on_disconnect(self, disconnect): - self.close() - self.disconnect_reason = disconnect.reason +from test_framework.util import wait_until class AutoDiscovery(ConfluxTestFramework): def set_test_params(self): self.setup_clean_chain = True - # node 0: boot node - # node 1,2,3: nodes with IP limitation disabled - # node 4: node with IP limitation enabled - self.num_nodes = 5 + self.num_nodes = 4 self.conf_parameters = { "discovery_fast_refresh_timeout_ms": "200", "discovery_round_timeout_ms": "100", "discovery_housekeeping_timeout_ms": "200", } - def discovery_args(self): - return ["--enable-discovery", "true", "--node-table-timeout", "1", "--node-table-promotion-timeout", "1"] - def setup_network(self): self.add_nodes(self.num_nodes) # init boot node: 0 self.bootnode = self.nodes[0] - extra_args = self.discovery_args() + extra_args = ["--enable-discovery", "true", "--node-table-timeout", "1", "--node-table-promotion-timeout", "1"] self.start_node(0, extra_args = extra_args) self.bootnode_id = "cfxnode://{}@{}:{}".format(self.bootnode.key[2:], self.bootnode.ip, self.bootnode.port) # init nodes: 1, 2, 3 (4 is used later) extra_args.extend(["--bootnodes", self.bootnode_id]) self.start_time = datetime.datetime.now() - for i in range(1, self.num_nodes - 1): + for i in range(1, self.num_nodes): self.start_node(i, extra_args=extra_args) def run_test(self): # nodes 0,1,2,3 will auto discover each other self.log.info("Test AutoDiscovery") - wait_until(lambda: [len(i.getpeerinfo()) for i in self.nodes[0:-1]].count(self.num_nodes - 2) == self.num_nodes - 1) + wait_until(lambda: [len(i.getpeerinfo()) for i in self.nodes].count(self.num_nodes - 1) == self.num_nodes) sec = (datetime.datetime.now() - self.start_time).total_seconds() - # assert_greater_than_or_equal(sec, 15) self.log.info("Passed after running %.2f seconds" % sec) - self.test_ip_limit() - - def test_ip_limit(self): - self.log.info("Test node number limitation per IP") - - # start node with IP limitation enabled - self.ip_limited_node = self.nodes[self.num_nodes - 1] - self.start_node(self.num_nodes - 1, extra_args=("--p2p-nodes-per-ip", "1")) - - # add a dummy peer to ensure IP used in underlying node table. - self.ip_limited_node.addnode(self.bootnode.key, "127.0.0.1:33333") - - # create a P2P connection, and should be refused because of IP limited during handshake - p2p = IpLimitedNode() - self.ip_limited_node.add_p2p_connection(p2p) - network_thread_start() - wait_until(lambda: p2p.disconnect_reason == 3 or p2p.state == "closed", timeout=3) - - if __name__ == "__main__": AutoDiscovery().main()