diff --git a/Cargo.lock b/Cargo.lock index ee720c40c..3b138be39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3092,7 +3092,6 @@ dependencies = [ "exit_trust_root", "handlebars", "ipnetwork", - "lazy_static", "lettre", "log", "num256", diff --git a/exit_trust_root/src/client_db.rs b/exit_trust_root/src/client_db.rs index 438aaecf0..5fa6eae2e 100644 --- a/exit_trust_root/src/client_db.rs +++ b/exit_trust_root/src/client_db.rs @@ -9,7 +9,7 @@ use clarity::{ abi::{encode_call, AbiToken}, Address, PrivateKey, Uint256, }; -use std::{net::IpAddr, time::Duration, vec}; +use std::{collections::HashSet, net::IpAddr, time::Duration, vec}; use tokio::time::timeout as future_timeout; use web30::{ client::Web3, @@ -24,7 +24,7 @@ pub async fn get_all_registered_clients( web30: &Web3, requester_address: Address, contract: Address, -) -> Result, Web3Error> { +) -> Result, Web3Error> { let payload = encode_call("getAllRegisteredUsers()", &[])?; let res = web30 .simulate_transaction( @@ -33,7 +33,8 @@ pub async fn get_all_registered_clients( ) .await?; - convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res)) + let val = convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))?; + Ok(val.into_iter().collect()) } pub async fn get_registered_client_using_wgkey( diff --git a/exit_trust_root/src/register_client_batch_loop.rs b/exit_trust_root/src/register_client_batch_loop.rs index ba3639e64..8fcd0e1b2 100644 --- a/exit_trust_root/src/register_client_batch_loop.rs +++ b/exit_trust_root/src/register_client_batch_loop.rs @@ -21,15 +21,6 @@ pub struct RegistrationRequest { pub const MAX_BATCH_SIZE: usize = 75; -/// Utility function used to easily perform O(1) lookups against the identities list -pub fn get_clients_hashset(input: Vec) -> HashSet { - let mut output = HashSet::new(); - for i in input { - output.insert(i); - } - output -} - /// This function monitors the registration queue lock free queue. It will dequeue any new entries and attempt to register them /// in a batch sent every REGISTRATION_LOOP_SPEED seconds. This function will also check if the user is already registered before attempting to register them pub async fn register_client_batch_loop( @@ -106,7 +97,6 @@ pub async fn register_client_batch_loop( continue; } }; - let all_clients = get_clients_hashset(all_clients); let mut clients_to_register = Vec::new(); for client in list.iter() { diff --git a/integration_tests/src/contract_test.rs b/integration_tests/src/contract_test.rs index bb6f4539a..4c01b9eac 100644 --- a/integration_tests/src/contract_test.rs +++ b/integration_tests/src/contract_test.rs @@ -201,7 +201,7 @@ pub async fn validate_contract_user_functionality(db_addr: Address) { // Try requests when there are no users present let res = get_all_registered_clients(&contact, miner_pub_key, db_addr).await; - assert_eq!(res.unwrap(), vec![]); + assert_eq!(res.unwrap(), HashSet::new()); let res = get_registered_client_using_wgkey(user_1.wg_public_key, miner_pub_key, db_addr, &contact) @@ -251,7 +251,9 @@ pub async fn validate_contract_user_functionality(db_addr: Address) { .await .unwrap(); - assert_eq!(vec![user_1], res); + let mut set = HashSet::new(); + set.insert(user_1); + assert_eq!(set, res); let nonce = contact .eth_get_transaction_count(miner_pub_key) @@ -342,7 +344,14 @@ pub async fn validate_contract_user_functionality(db_addr: Address) { info!("All users are : {:?}", res); - assert_eq!(vec![user_1, user_2, user_3, user_4, user_5, user_6], res); + let mut set = HashSet::new(); + set.insert(user_1); + set.insert(user_2); + set.insert(user_3); + set.insert(user_4); + set.insert(user_5); + set.insert(user_6); + assert_eq!(set, res); info!("Trying to retrive user 1"); let res = diff --git a/integration_tests/src/setup_utils/rita.rs b/integration_tests/src/setup_utils/rita.rs index a2079fec5..62269fc73 100644 --- a/integration_tests/src/setup_utils/rita.rs +++ b/integration_tests/src/setup_utils/rita.rs @@ -36,6 +36,7 @@ use rita_common::rita_loop::{ write_to_disk::{save_to_disk_loop, SettingsOnDisk}, }; use rita_exit::rita_loop::start_rita_exit_list_endpoint; +use rita_exit::ClientListAnIpAssignmentMap; use rita_exit::{ dashboard::start_rita_exit_dashboard, operator_update::update_loop::start_operator_update_loop, @@ -49,6 +50,7 @@ use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::time::Instant; +use std::vec; use std::{ collections::HashMap, convert::TryInto, @@ -286,14 +288,21 @@ pub fn spawn_rita_exit( settings::get_rita_exit(), ))); + let client_and_ip_map = Arc::new(RwLock::new(ClientListAnIpAssignmentMap::new( + HashSet::new(), + ))); + let workers = 4; start_core_rita_endpoints(workers as usize); - start_rita_exit_endpoints(workers as usize); - start_rita_exit_list_endpoint(workers as usize); + start_rita_exit_endpoints(client_and_ip_map.clone()); + start_rita_exit_list_endpoint(); start_rita_exit_dashboard(Arc::new(RwLock::new(None))); // this one blocks - start_rita_exit_loop(vec![]); + let system = actix::System::new(); + system.block_on(async { + start_rita_exit_loop(client_and_ip_map).await; + }); }); // wait for the child thread to finish initializing diff --git a/rita_bin/src/contract-util.rs b/rita_bin/src/contract-util.rs index 18062849e..fc22cc55d 100644 --- a/rita_bin/src/contract-util.rs +++ b/rita_bin/src/contract-util.rs @@ -16,7 +16,6 @@ use docopt::Docopt; use exit_trust_root::client_db::add_exits_to_registration_list; use exit_trust_root::client_db::add_users_to_registered_list; use exit_trust_root::client_db::get_all_registered_clients; -use exit_trust_root::register_client_batch_loop::get_clients_hashset; use exit_trust_root::register_client_batch_loop::MAX_BATCH_SIZE; use log::{error, info}; use rita_db_migration::{ @@ -78,7 +77,6 @@ async fn main() { panic!("Failed to get list of already registered clients {:?}", e); } }; - let all_contract_clients = get_clients_hashset(all_contract_clients); let db_conn = get_database_connection(db_url).unwrap(); diff --git a/rita_bin/src/exit.rs b/rita_bin/src/exit.rs index d165e237d..d588bc152 100644 --- a/rita_bin/src/exit.rs +++ b/rita_bin/src/exit.rs @@ -19,6 +19,8 @@ use exit_trust_root::client_db::get_all_registered_clients; #[cfg(feature = "jemalloc")] use jemallocator::Jemalloc; use rita_exit::rita_loop::start_rita_exit_list_endpoint; +use rita_exit::ClientListAnIpAssignmentMap; +use std::collections::HashSet; use std::sync::Arc; use std::sync::RwLock; use std::time::Duration; @@ -158,9 +160,12 @@ async fn main() { check_startup_balance_and_contract(args.flag_fail_on_startup, startup_status).await; let workers = settings.workers; + + let client_and_ip_map = Arc::new(RwLock::new(ClientListAnIpAssignmentMap::new(clients))); + start_core_rita_endpoints(workers as usize); - start_rita_exit_endpoints(workers as usize); - start_rita_exit_list_endpoint(workers as usize); + start_rita_exit_endpoints(client_and_ip_map.clone()); + start_rita_exit_list_endpoint(); start_rita_common_loops(); start_operator_update_loop(); @@ -169,7 +174,7 @@ async fn main() { ))); // this call blocks, transforming this startup thread into the main exit watchdog thread - start_rita_exit_loop(clients); + start_rita_exit_loop(client_and_ip_map).await; } /// This function performs startup integrity checks on the config and system. It checks that we can reach the internet @@ -182,7 +187,7 @@ async fn main() { async fn check_startup_balance_and_contract( fail_on_startup: bool, startup_status: Arc>>, -) -> Vec { +) -> HashSet { let payment_settings = settings::get_rita_common().payment; let our_address = payment_settings.eth_address.expect("No address!"); @@ -214,7 +219,7 @@ async fn check_startup_balance_and_contract( users.unwrap() } -async fn get_registered_users() -> Result, Web3Error> { +async fn get_registered_users() -> Result, Web3Error> { let payment_settings = settings::get_rita_common().payment; let our_address = payment_settings.eth_address.expect("No address!"); let full_node = get_web3_server(); diff --git a/rita_exit/Cargo.toml b/rita_exit/Cargo.toml index e81c5f2df..7eac228d5 100644 --- a/rita_exit/Cargo.toml +++ b/rita_exit/Cargo.toml @@ -16,7 +16,6 @@ babel_monitor = { path = "../babel_monitor" } actix = {workspace = true} awc = {workspace = true} handlebars = "5.1" -lazy_static = "1.5" ipnetwork = "0.20" clarity = {workspace = true} serde = "1.0" diff --git a/rita_exit/src/dashboard.rs b/rita_exit/src/dashboard.rs index ef6c89451..a0c073913 100644 --- a/rita_exit/src/dashboard.rs +++ b/rita_exit/src/dashboard.rs @@ -1,5 +1,5 @@ pub use crate::database::geoip::*; -pub use crate::database::in_memory_database::*; +pub use crate::database::ipddr_assignment::*; use actix::System; use actix_web::web; use actix_web::App; diff --git a/rita_exit/src/database/geoip.rs b/rita_exit/src/database/geoip.rs index e1b71dc39..6ed570999 100644 --- a/rita_exit/src/database/geoip.rs +++ b/rita_exit/src/database/geoip.rs @@ -1,4 +1,4 @@ -use actix::System; +use crate::RitaExitError; use althea_kernel_interface::interface_tools::get_wg_remote_ip; use althea_types::regions::Regions; use babel_monitor::open_babel_stream; @@ -9,9 +9,6 @@ use std::collections::HashMap; use std::net::IpAddr; use std::time::Duration; -use crate::database::RITA_EXIT_STATE; -use crate::RitaExitError; - /// gets the gateway ip for a given mesh IP pub fn get_gateway_ip_single(mesh_ip: IpAddr) -> Result> { let babel_port = settings::get_rita_exit().network.babel_port; @@ -131,7 +128,10 @@ struct CountryDetails { } /// get ISO country code from ip, consults a in memory cache -pub fn get_country(ip: IpAddr) -> Result> { +pub async fn get_country( + geoip_cache: &mut HashMap, + ip: IpAddr, +) -> Result> { trace!("get GeoIP country for {}", ip.to_string()); // if allowed countries is not configured we don't care and will use @@ -170,12 +170,7 @@ pub fn get_country(ip: IpAddr) -> Result> { // we have to turn this option into a string in order to avoid // the borrow checker trying to keep this lock open for a long period - let cache_result = RITA_EXIT_STATE - .read() - .unwrap() - .geoip_cache - .get(&ip) - .copied(); + let cache_result = geoip_cache.get(&ip).copied(); match cache_result { Some(code) => Ok(code), @@ -187,55 +182,51 @@ pub fn get_country(ip: IpAddr) -> Result> { ip.to_string() ); // run in async closure and return the result - let runner = System::new(); - runner.block_on(async move { - let client = awc::Client::new(); - if let Ok(mut res) = client - .get(&geo_ip_url) - .basic_auth(api_user, api_key) - .timeout(Duration::from_secs(1)) - .send() - .await - { - trace!("Got geoip result {:?}", res); - if let Ok(res) = res.json().await { - let value: GeoIpRet = res; - let code = match value.country.iso_code.parse() { - Ok(r) => r, - Err(_) => { - error!( - "Failed to parse geoip response {:?}", - value.country.iso_code - ); - Regions::UnkownRegion - } - }; - trace!("Adding GeoIP value {:?} to cache", code); - RITA_EXIT_STATE - .write() - .unwrap() - .geoip_cache - .insert(ip, code); - trace!("Added to cache, returning"); - Ok(code) - } else { - Err(Box::new(RitaExitError::MiscStringError( - "Failed to deserialize geoip response".to_string(), - ))) - } + let client = awc::Client::new(); + if let Ok(mut res) = client + .get(&geo_ip_url) + .basic_auth(api_user, api_key) + .timeout(Duration::from_secs(1)) + .send() + .await + { + trace!("Got geoip result {:?}", res); + if let Ok(res) = res.json().await { + let value: GeoIpRet = res; + let code = match value.country.iso_code.parse() { + Ok(r) => r, + Err(_) => { + error!( + "Failed to parse geoip response {:?}", + value.country.iso_code + ); + Regions::UnkownRegion + } + }; + trace!("Adding GeoIP value {:?} to cache", code); + geoip_cache.insert(ip, code); + trace!("Added to cache, returning"); + Ok(code) } else { Err(Box::new(RitaExitError::MiscStringError( - "Request failed".to_string(), + "Failed to deserialize geoip response".to_string(), ))) } - }) + } else { + Err(Box::new(RitaExitError::MiscStringError( + "Request failed".to_string(), + ))) + } } } } /// Returns true or false if an ip is confirmed to be inside or outside the region and error /// if an api error is encountered trying to figure that out. -pub fn verify_ip(request_ip: IpAddr) -> Result> { +pub async fn verify_ip( + geoip_cache: &mut HashMap, + request_ip: IpAddr, +) -> Result> { // in this case we have a gateway directly attached to the exit, so our // peer address for them will be an fe80 linklocal ip address. When we // detect this we know that they are in the allowed countries list because @@ -249,7 +240,7 @@ pub fn verify_ip(request_ip: IpAddr) -> Result> { if settings::get_rita_exit().allowed_countries.is_empty() { Ok(true) } else { - let country = get_country(request_ip)?; + let country = get_country(geoip_cache, request_ip).await?; if !settings::get_rita_exit().allowed_countries.is_empty() && !settings::get_rita_exit() .allowed_countries @@ -262,8 +253,45 @@ pub fn verify_ip(request_ip: IpAddr) -> Result> { } } -#[test] -#[ignore] -fn test_get_country() { - get_country("8.8.8.8".parse().unwrap()).unwrap(); +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + #[actix_web::test] + #[ignore] + async fn test_get_country() { + let mut geoip_cache = HashMap::new(); + let ip = IpAddr::from_str("8.8.8.8").unwrap(); + let result = get_country(&mut geoip_cache, ip).await; + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_get_gateway_ip_single() { + let ip = IpAddr::from_str("2001:4860:4860::8888").unwrap(); + let result = get_gateway_ip_single(ip); + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_get_gateway_ip_bulk() { + let ips = vec![ + IpAddr::from_str("2001:4860:4860::8888").unwrap(), + IpAddr::from_str("2001:4860:4860::8844").unwrap(), + ]; + let result = get_gateway_ip_bulk(ips, Duration::from_secs(5)); + assert!(result.is_ok()); + } + + #[actix_web::test] + #[ignore] + async fn test_verify_ip() { + let mut geoip_cache = HashMap::new(); + let ip = IpAddr::from_str("8.8.8.8").unwrap(); + let result = verify_ip(&mut geoip_cache, ip).await; + assert!(result.is_ok()); + } } diff --git a/rita_exit/src/database/in_memory_database.rs b/rita_exit/src/database/in_memory_database.rs deleted file mode 100644 index bc204e16c..000000000 --- a/rita_exit/src/database/in_memory_database.rs +++ /dev/null @@ -1,596 +0,0 @@ -use althea_kernel_interface::ExitClient; -use althea_types::{Identity, WgKey}; -use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network}; -use std::collections::hash_map::DefaultHasher; -use std::collections::{HashMap, HashSet}; -use std::convert::TryInto; -use std::fmt::Write; -use std::hash::{Hash, Hasher}; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - -use crate::RitaExitError; - -use super::RITA_EXIT_STATE; - -/// Wg exit port on client side -pub const CLIENT_WG_PORT: u16 = 59999; - -/// Max number of time we try to generate a valid ip addr before returning an eror -pub const MAX_IP_RETRIES: u8 = 10; - -// Default Subnet size assigned to each client -pub const DEFAULT_CLIENT_SUBNET_SIZE: u8 = 56; - -#[derive(Clone, Debug, Default)] -pub struct IpAssignmentMap { - pub ipv6_assignments: HashMap, - pub internal_ip_assignments: HashMap, -} - -// Lazy static setters/getters -pub fn get_ipv6_assignments() -> HashMap { - RITA_EXIT_STATE - .read() - .unwrap() - .ip_assignment_map - .ipv6_assignments - .clone() -} - -pub fn get_internal_ip_assignments() -> HashMap { - RITA_EXIT_STATE - .read() - .unwrap() - .ip_assignment_map - .internal_ip_assignments - .clone() -} - -pub fn add_new_ipv6_assignment(addr: IpAddr, key: WgKey) { - RITA_EXIT_STATE - .write() - .unwrap() - .ip_assignment_map - .ipv6_assignments - .insert(addr, key); -} - -pub fn add_new_internal_ip_assignement(addr: IpAddr, key: WgKey) { - RITA_EXIT_STATE - .write() - .unwrap() - .ip_assignment_map - .internal_ip_assignments - .insert(addr, key); -} - -/// Take an index i, a larger subnet and a smaller subnet length and generate the ith smaller subnet in the larger subnet -/// For instance, if our larger subnet is fd00::1330/120, smaller sub len is 124, and index is 1, our generated subnet would be fd00::1310/124 -pub fn generate_iterative_client_subnet( - exit_sub: IpNetwork, - ind: u64, - subprefix: u8, -) -> Result> { - let net; - - // Covert the subnet's ip address into a u128 integer to allow for easy iterative - // addition operations. To this u128, we add (interative_index * client_subnet_size) - // and convert this result into an ipv6 addr. This is the starting ip in the client subnet - // - // For example, if we have exit subnet: fbad::1000/120, client subnet size is 124, index is 1 - // we do (fbad::1000).to_int() + (16 * 1) = fbad::1010/124 is the client subnet - let net_as_int: u128 = if let IpAddr::V6(addr) = exit_sub.network() { - net = Ipv6Network::new(addr, subprefix).unwrap(); - addr.into() - } else { - return Err(Box::new(RitaExitError::MiscStringError( - "Exit subnet expected to be ipv6!!".to_string(), - ))); - }; - - if subprefix < exit_sub.prefix() { - return Err(Box::new(RitaExitError::MiscStringError( - "Client subnet larger than exit subnet".to_string(), - ))); - } - - // This bitshifting is the total number of client subnets available. We are checking that our iterative index - // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be - // 2^(124 - 120) => 2^4 => 16 - if ind < (1 << (subprefix - exit_sub.prefix())) { - let ret = net_as_int + (ind as u128 * net.size()); - let v6addr = Ipv6Addr::from(ret); - let ret = IpNetwork::from(match Ipv6Network::new(v6addr, subprefix) { - Ok(a) => a, - Err(e) => { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to parse a valid client subnet: {e:?}" - )))) - } - }); - - Ok(ret) - } else { - error!( - "Our index is larger than available subnets, either error in logic or no more subnets" - ); - Err(Box::new(RitaExitError::MiscStringError( - "Index larger than available subnets".to_string(), - ))) - } -} - -/// Given a client identity, get the clients ipv6 addr using the wgkey as a generative seed -pub fn get_client_ipv6( - their_record: Identity, - exit_sub: Option, - client_subnet_size: u8, -) -> Result, Box> { - if let Some(exit_sub) = exit_sub { - let wg_hash = hash_wgkey(their_record.wg_public_key); - - // This bitshifting is the total number of client subnets available. We are checking that our iterative index - // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be - // 2^(124 - 120) => 2^4 => 16 - let total_subnets = 1 << (client_subnet_size - exit_sub.prefix()); - let mut generative_index = wg_hash % total_subnets; - - // Loop to try to generate a valid address - let mut retries = 0; - loop { - // Return an error if we retry too many times - if retries > MAX_IP_RETRIES { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to get internet ipv6 using network {} and index {}", - exit_sub, generative_index - )))); - } - - let client_subnet = - generate_iterative_client_subnet(exit_sub, generative_index, client_subnet_size)?; - - if validate_internet_ipv6(client_subnet, their_record.wg_public_key) { - add_new_ipv6_assignment(client_subnet.ip(), their_record.wg_public_key); - return Ok(Some(client_subnet)); - } else { - retries += 1; - generative_index = (generative_index + 1) % total_subnets; - continue; - } - } - } else { - // This exit doesnt support ipv6 - Ok(None) - } -} - -/// Given a client identity, get the clients internal ip addr using the wgkey as a generative seed -pub fn get_client_internal_ip( - their_record: Identity, - netmask: u8, - gateway_ip: Ipv4Addr, -) -> Result> { - let wg_hash = hash_wgkey(their_record.wg_public_key); - // total number of available addresses - let total_addresses: u64 = 2_u64.pow((32 - netmask).into()); - let mut generative_index = wg_hash % total_addresses; - let network = match Ipv4Network::new(gateway_ip, netmask) { - Ok(a) => a, - Err(e) => { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to setup and ipnetwork to generate internal ip {}", - e - )))) - } - }; - - // Keep trying to generate an address till we get a valid one - let mut retries = 0; - loop { - // Return an error if we retry too many times - if retries > MAX_IP_RETRIES { - return Err(Box::new(RitaExitError::MiscStringError(format!( - "Unable to get internal ip using network {} and index {}", - network, generative_index - )))); - } - - let internal_ip = network.nth(match generative_index.try_into() { - Ok(a) => a, - Err(e) => { - warn!("Internal Ip failure: {}", e); - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - }); - - let internal_ip = match internal_ip { - Some(a) => a, - None => { - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - }; - - // Validate that this ip is valid and return it - if validate_internal_ip(network, internal_ip, gateway_ip, their_record.wg_public_key) { - add_new_internal_ip_assignement(IpAddr::V4(internal_ip), their_record.wg_public_key); - return Ok(IpAddr::V4(internal_ip)); - } else { - retries += 1; - generative_index = (generative_index + 1) % total_addresses; - continue; - } - } -} - -/// Check that this ip can be assigned, make sure there isnt a collision with previously assigned ips -pub fn validate_internet_ipv6(client_subnet: IpNetwork, our_wgkey: WgKey) -> bool { - let assigned_ips = get_ipv6_assignments(); - let assignment = assigned_ips.get(&client_subnet.ip()); - match assignment { - Some(a) => { - // There is an entry, verify if its our entry else false - *a == our_wgkey - } - // There is no assigned ip here, ip is valid - None => true, - } -} - -/// Check that this ip can be assigned, make sure it isnt our ip, network ip, broadcast ip, etc -pub fn validate_internal_ip( - network: Ipv4Network, - assigned_ip: Ipv4Addr, - our_ip: Ipv4Addr, - our_wgkey: WgKey, -) -> bool { - let broadcast = network.broadcast(); - let network_ip = network.network(); - - // Collision with our ip - if assigned_ip == our_ip { - return false; - } - // collision with the network ip - if assigned_ip == network_ip { - return false; - } - // collision with broadcast address - if assigned_ip == broadcast { - return false; - } - - let assignments = get_internal_ip_assignments(); - let assignment = assignments.get(&IpAddr::V4(assigned_ip)); - match assignment { - Some(a) => { - // check if this existing ip is ours - *a == our_wgkey - } - // No assignment, we can use this address - None => true, - } -} - -pub fn to_exit_client(client: Identity) -> Result> { - let internet_ipv6 = get_client_ipv6( - client, - settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), - settings::get_rita_exit() - .get_client_subnet_size() - .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), - )?; - let internal_ip = get_client_internal_ip( - client, - settings::get_rita_exit() - .exit_network - .internal_ipv4 - .prefix(), - settings::get_rita_exit() - .exit_network - .internal_ipv4 - .internal_ip(), - )?; - - Ok(ExitClient { - mesh_ip: client.mesh_ip, - internal_ip, - port: CLIENT_WG_PORT, - public_key: client.wg_public_key, - internet_ipv6, - }) -} - -pub fn hash_wgkey(key: WgKey) -> u64 { - let mut hasher = DefaultHasher::new(); - key.to_string().hash(&mut hasher); - hasher.finish() -} - -/// quick display function for a neat error -pub fn display_hashset(input: &HashSet) -> String { - let mut out = String::new(); - for item in input.iter() { - write!(out, "{}, ", item.to_string()).unwrap(); - } - out -} - -#[cfg(test)] -mod tests { - use althea_types::Identity; - use ipnetwork::IpNetwork; - - use crate::database::in_memory_database::{ - generate_iterative_client_subnet, get_client_internal_ip, get_internal_ip_assignments, - get_ipv6_assignments, - }; - - use super::{get_client_ipv6, hash_wgkey}; - - #[test] - fn test_internet_ipv6_assignment() { - let exit_sub = Some("2602:FBAD:10::/126".parse().unwrap()); - let dummy_client = Identity { - mesh_ip: "fd00::1337".parse().unwrap(), - eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" - .parse() - .unwrap(), - wg_public_key: "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" - .parse() - .unwrap(), - nickname: None, - }; - - // Generate a client subnet - let ip = get_client_ipv6(dummy_client, exit_sub, 128) - .unwrap() - .unwrap(); - - // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 1); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client.wg_public_key - ); - - // Try retrieving the same client - let ip_2 = get_client_ipv6(dummy_client, exit_sub, 128) - .unwrap() - .unwrap(); - assert_eq!(ip, ip_2); - - // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 1); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client.wg_public_key - ); - - println!("Assigned Ip client 1: {:?}", ip); - - // Add a second client - let dummy_client_2 = Identity { - mesh_ip: "fd00::1447".parse().unwrap(), - eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" - .parse() - .unwrap(), - wg_public_key: "CEnTMKvpWr+xTFl7niTYyqH56w5iPdMjiC938X542GA=" - .parse() - .unwrap(), - nickname: None, - }; - - // Generate a client subnet - let ip = get_client_ipv6(dummy_client_2, exit_sub, 128) - .unwrap() - .unwrap(); - - // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 2); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client_2.wg_public_key - ); - - let ip_2 = get_client_ipv6(dummy_client_2, exit_sub, 128) - .unwrap() - .unwrap(); - assert_eq!(ip, ip_2); - - // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 2); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client_2.wg_public_key - ); - - println!("Assigned Ip client 2: {:?}", ip); - - // Generate a collision - let dummy_client_3 = Identity { - mesh_ip: "fd00::1557".parse().unwrap(), - eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" - .parse() - .unwrap(), - wg_public_key: "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" - .parse() - .unwrap(), - nickname: None, - }; - - // Generate a client subnet - let ip = get_client_ipv6(dummy_client_3, exit_sub, 128) - .unwrap() - .unwrap(); - - // Verify assignement db is correctly populated - assert!(get_ipv6_assignments().len() == 3); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client_3.wg_public_key - ); - - let _ = get_client_ipv6(dummy_client_2, exit_sub, 128) - .unwrap() - .unwrap(); - let ip_2 = get_client_ipv6(dummy_client_3, exit_sub, 128) - .unwrap() - .unwrap(); - assert_eq!(ip, ip_2); - - // Make sure no new entries in assignemnt db - assert!(get_ipv6_assignments().len() == 3); - assert_eq!( - *get_ipv6_assignments().get(&ip.ip()).unwrap(), - dummy_client_3.wg_public_key - ); - - println!("Assigned Ip client 3: {:?}", ip); - } - - #[test] - fn hash_playground() { - let key_1_hash = hash_wgkey( - "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" - .parse() - .unwrap(), - ) % 4; - println!("hash: {}", key_1_hash); - let key_1_hash = hash_wgkey( - "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" - .parse() - .unwrap(), - ) % 4; - println!("hash: {}", key_1_hash); - let key_1_hash = hash_wgkey( - "CEnTMKvpWr+xTFl7niTYyqH56w5iPdMjiC938X542GA=" - .parse() - .unwrap(), - ) % 4; - println!("hash: {}", key_1_hash); - } - - #[test] - fn test_internal_ip_assignment() { - let dummy_client = Identity { - mesh_ip: "fd00::1337".parse().unwrap(), - eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" - .parse() - .unwrap(), - wg_public_key: "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" - .parse() - .unwrap(), - nickname: None, - }; - let ip = - get_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()).unwrap(); - - // Verify assignement db is correctly populated - assert!(get_internal_ip_assignments().len() == 1); - assert_eq!( - *get_internal_ip_assignments().get(&ip).unwrap(), - dummy_client.wg_public_key - ); - - // requesting the same client shouldnt change any state - let ip2 = - get_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()).unwrap(); - - assert_eq!(ip, ip2); - - assert!(get_internal_ip_assignments().len() == 1); - assert_eq!( - *get_internal_ip_assignments().get(&ip2).unwrap(), - dummy_client.wg_public_key - ); - - println!("Internal ip client 1: {}", ip); - - // Second client who collides - let dummy_client_2 = Identity { - mesh_ip: "fd00::1557".parse().unwrap(), - eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" - .parse() - .unwrap(), - wg_public_key: "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" - .parse() - .unwrap(), - nickname: None, - }; - - let ip = - get_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()).unwrap(); - - // Verify assignement db is correctly populated - assert!(get_internal_ip_assignments().len() == 2); - assert_eq!( - *get_internal_ip_assignments().get(&ip).unwrap(), - dummy_client_2.wg_public_key - ); - - // requesting the same client shouldnt change any state - let ip2 = - get_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()).unwrap(); - - assert_eq!(ip, ip2); - - assert!(get_internal_ip_assignments().len() == 2); - assert_eq!( - *get_internal_ip_assignments().get(&ip2).unwrap(), - dummy_client_2.wg_public_key - ); - - println!("Internal ip client 2: {}", ip); - } - - /// Test iterative subnet generation - #[test] - fn test_generate_iterative_subnet() { - // Complex subnet example - let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 0, 64); - assert_eq!("2602:FBAD::/64".parse::().unwrap(), ret.unwrap()); - - let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 1, 64); - assert_eq!( - "2602:FBAD:0:1::/64".parse::().unwrap(), - ret.unwrap() - ); - - let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 50, 64); - assert_eq!( - "2602:FBAD:0:32::/64".parse::().unwrap(), - ret.unwrap() - ); - - let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 2_u64.pow(24), 64); - assert!(ret.is_err()); - - let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 0, 30); - assert!(ret.is_err()); - - // Simple subnet example - let net: IpNetwork = "fd00::1337/120".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 0, 124); - assert_eq!("fd00::1300/124".parse::().unwrap(), ret.unwrap()); - - let net: IpNetwork = "fd00::1337/120".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 2, 124); - assert_eq!("fd00::1320/124".parse::().unwrap(), ret.unwrap()); - - let net: IpNetwork = "fd00::1337/120".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 15, 124); - assert_eq!("fd00::13f0/124".parse::().unwrap(), ret.unwrap()); - let net: IpNetwork = "fd00::1337/120".parse().unwrap(); - let ret = generate_iterative_client_subnet(net, 16, 124); - assert!(ret.is_err()); - } -} diff --git a/rita_exit/src/database/ipddr_assignment.rs b/rita_exit/src/database/ipddr_assignment.rs new file mode 100644 index 000000000..5996ba615 --- /dev/null +++ b/rita_exit/src/database/ipddr_assignment.rs @@ -0,0 +1,665 @@ +use althea_kernel_interface::ExitClient; +use althea_types::{ExitClientDetails, ExitClientIdentity, ExitState, Identity, WgKey}; +use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network}; +use settings::get_rita_exit; +use std::collections::hash_map::DefaultHasher; +use std::collections::{HashMap, HashSet}; +use std::fmt::Write; +use std::hash::{Hash, Hasher}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use crate::database::get_exit_info; +use crate::RitaExitError; + +/// Wg exit port on client side +pub const CLIENT_WG_PORT: u16 = 59999; + +/// Max number of time we try to generate a valid ip addr before returning an eror +pub const MAX_IP_RETRIES: u8 = 10; + +// Default Subnet size assigned to each client +pub const DEFAULT_CLIENT_SUBNET_SIZE: u8 = 56; + +#[derive(Clone, Debug, Default)] +pub struct ClientListAnIpAssignmentMap { + ipv6_assignments: HashMap, + internal_ip_assignments: HashMap, + registered_clients: HashSet, +} + +impl ClientListAnIpAssignmentMap { + pub fn new(clients: HashSet) -> Self { + ClientListAnIpAssignmentMap { + ipv6_assignments: HashMap::new(), + internal_ip_assignments: HashMap::new(), + registered_clients: clients, + } + } + + pub fn get_ipv6_assignments(&self) -> HashMap { + self.ipv6_assignments.clone() + } + + pub fn get_internal_ip_assignments(&self) -> HashMap { + self.internal_ip_assignments.clone() + } + + pub fn is_client_registered(&self, client: Identity) -> bool { + self.registered_clients.contains(&client) + } + + pub fn get_registered_clients(&self) -> HashSet { + self.registered_clients.clone() + } + + pub fn set_registered_clients(&mut self, clients: HashSet) { + self.registered_clients = clients; + } + + /// Returns true if the provided ipv4 address is valid for use between the client and the exit + /// as the internal ip + pub fn ip_is_valid_to_assign( + &self, + network: Ipv4Network, + assigned_ip: Ipv4Addr, + our_ip: Ipv4Addr, + ) -> bool { + let broadcast = network.broadcast(); + let network_ip = network.network(); + + // Collision with our ip + if assigned_ip == our_ip { + return false; + } + // collision with the network ip + if assigned_ip == network_ip { + return false; + } + // collision with broadcast address + if assigned_ip == broadcast { + return false; + } + + !self.internal_ip_assignments.contains_key(&assigned_ip) + } + + /// Validates if an IPv6 subnet is valid to assign to a client + pub fn is_ipv6_subnet_valid_to_assign(&self, client_subnet: Ipv6Network) -> bool { + !self.ipv6_assignments.contains_key(&client_subnet) + } + + /// Gets the status of a client, this may include assigning them an IP if there's no existing assignment + pub fn get_client_status( + &mut self, + client: ExitClientIdentity, + ) -> Result> { + trace!("Checking if record exists for {:?}", client.global.mesh_ip); + let exit = get_rita_exit(); + let exit_network = exit.exit_network.clone(); + let own_internal_ip = exit_network.internal_ipv4.internal_ip(); + let internal_netmask = exit_network.internal_ipv4.prefix(); + if self.is_client_registered(client.global) { + trace!("record exists, updating"); + + let current_ip: Ipv4Addr = self.get_or_add_client_internal_ip( + client.global, + internal_netmask, + own_internal_ip, + )?; + let current_internet_ipv6 = self.get_or_add_client_ipv6( + client.global, + exit_network.get_ipv6_subnet_alt(), + exit.get_client_subnet_size() + .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), + )?; + + let current_internet_ipv6: Option = current_internet_ipv6.map(|a| a.into()); + + Ok(ExitState::Registered { + our_details: ExitClientDetails { + client_internal_ip: IpAddr::V4(current_ip), + internet_ipv6_subnet: current_internet_ipv6, + }, + general_details: get_exit_info(), + message: "Registration OK".to_string(), + identity: Box::new(exit.get_exit_identity()), + }) + } else { + Err(Box::new(RitaExitError::NoClientError)) + } + } + + /// Given a client identity, get the clients internal ipv4 addr using the wgkey as a generative seed + /// this is the ip used for the wg_exit tunnel for the client. Not the clients public ip visible to the internet + /// which is determined by the NAT settings on the exit + pub fn get_or_add_client_internal_ip( + &mut self, + their_record: Identity, + netmask: u8, + gateway_ip: Ipv4Addr, + ) -> Result> { + let wg_hash = hash_wgkey(their_record.wg_public_key); + // total number of available addresses + let total_addresses: u64 = 2_u64.pow((32 - netmask).into()); + let mut generative_index = wg_hash % total_addresses; + let network = match Ipv4Network::new(gateway_ip, netmask) { + Ok(a) => a, + Err(e) => { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to setup and ipnetwork to generate internal ip {}", + e + )))) + } + }; + + // check if we already have an ip for this client, TODO optimize this datastructure, it's optimized for generating + // new ip, not for lookup, the generation process can be streamlined to avoid that. + for (ip, id) in self.internal_ip_assignments.iter() { + if *id == their_record { + return Ok(*ip); + } + } + + // Keep trying to generate an address till we get a valid one + let mut retries = 0; + loop { + // Return an error if we retry too many times + if retries > MAX_IP_RETRIES { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to get internal ip using network {} and index {}", + network, generative_index + )))); + } + + let internal_ip = network.nth(match generative_index.try_into() { + Ok(a) => a, + Err(e) => { + warn!("Internal Ip failure: {}", e); + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + }); + + let internal_ip = match internal_ip { + Some(a) => a, + None => { + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + }; + + // Validate that this ip is valid and return it + if self.ip_is_valid_to_assign(network, internal_ip, gateway_ip) { + self.internal_ip_assignments + .insert(internal_ip, their_record); + return Ok(internal_ip); + } else { + retries += 1; + generative_index = (generative_index + 1) % total_addresses; + continue; + } + } + } + + pub fn get_or_add_client_ipv6( + &mut self, + their_record: Identity, + exit_sub: Option, + client_subnet_size: u8, + ) -> Result, Box> { + // check if we already have an ip for this client, TODO optimize this datastructure, it's optimized for generating + // new ip, not for lookup, the generation process can be streamlined to avoid that. + for (ip, id) in self.ipv6_assignments.iter() { + if *id == their_record { + return Ok(Some(*ip)); + } + } + + if let Some(exit_sub) = exit_sub { + let wg_hash = hash_wgkey(their_record.wg_public_key); + + // This bitshifting is the total number of client subnets available. We are checking that our iterative index + // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be + // 2^(124 - 120) => 2^4 => 16 + let total_subnets = 1 << (client_subnet_size - exit_sub.prefix()); + let mut generative_index = wg_hash % total_subnets; + + // Loop to try to generate a valid address + let mut retries = 0; + loop { + // Return an error if we retry too many times + if retries > MAX_IP_RETRIES { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to get internet ipv6 using network {} and index {}", + exit_sub, generative_index + )))); + } + + let client_subnet = generate_iterative_client_subnet( + exit_sub, + generative_index, + client_subnet_size, + )?; + + if self.is_ipv6_subnet_valid_to_assign(client_subnet) { + self.ipv6_assignments.insert(client_subnet, their_record); + return Ok(Some(client_subnet)); + } else { + retries += 1; + generative_index = (generative_index + 1) % total_subnets; + continue; + } + } + } else { + // This exit doesnt support ipv6 + Ok(None) + } + } + + /// Convert an identity into a rita exit client, this is used to setup the exit tunnel for a client + /// if the client has not already been assigned an ip, it will be assigned one as the exit client + /// is created + pub fn id_to_exit_client( + &mut self, + client: Identity, + ) -> Result> { + let internet_ipv6 = self.get_or_add_client_ipv6( + client, + settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), + settings::get_rita_exit() + .get_client_subnet_size() + .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), + )?; + let internal_ip = self.get_or_add_client_internal_ip( + client, + settings::get_rita_exit() + .exit_network + .internal_ipv4 + .prefix(), + settings::get_rita_exit() + .exit_network + .internal_ipv4 + .internal_ip(), + )?; + let internet_ipv6 = internet_ipv6.map(|a| a.into()); + + Ok(ExitClient { + mesh_ip: client.mesh_ip, + internal_ip: IpAddr::V4(internal_ip), + port: CLIENT_WG_PORT, + public_key: client.wg_public_key, + internet_ipv6, + }) + } +} + +/// Take an index i, a larger subnet and a smaller subnet length and generate the ith smaller subnet in the larger subnet +/// For instance, if our larger subnet is fd00::1330/120, smaller sub len is 124, and index is 1, our generated subnet would be fd00::1310/124 +pub fn generate_iterative_client_subnet( + exit_sub: IpNetwork, + ind: u64, + subprefix: u8, +) -> Result> { + let net; + + // Covert the subnet's ip address into a u128 integer to allow for easy iterative + // addition operations. To this u128, we add (interative_index * client_subnet_size) + // and convert this result into an ipv6 addr. This is the starting ip in the client subnet + // + // For example, if we have exit subnet: fbad::1000/120, client subnet size is 124, index is 1 + // we do (fbad::1000).to_int() + (16 * 1) = fbad::1010/124 is the client subnet + let net_as_int: u128 = if let IpAddr::V6(addr) = exit_sub.network() { + net = Ipv6Network::new(addr, subprefix).unwrap(); + addr.into() + } else { + return Err(Box::new(RitaExitError::MiscStringError( + "Exit subnet expected to be ipv6!!".to_string(), + ))); + }; + + if subprefix < exit_sub.prefix() { + return Err(Box::new(RitaExitError::MiscStringError( + "Client subnet larger than exit subnet".to_string(), + ))); + } + + // This bitshifting is the total number of client subnets available. We are checking that our iterative index + // is lower than this number. For example, exit subnet: fd00:1000/120, client subnet /124, number of subnets will be + // 2^(124 - 120) => 2^4 => 16 + if ind < (1 << (subprefix - exit_sub.prefix())) { + let ret = net_as_int + (ind as u128 * net.size()); + let v6addr = Ipv6Addr::from(ret); + let ret = match Ipv6Network::new(v6addr, subprefix) { + Ok(a) => a, + Err(e) => { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Unable to parse a valid client subnet: {e:?}" + )))) + } + }; + + Ok(ret) + } else { + error!( + "Our index is larger than available subnets, either error in logic or no more subnets" + ); + Err(Box::new(RitaExitError::MiscStringError( + "Index larger than available subnets".to_string(), + ))) + } +} + +pub fn hash_wgkey(key: WgKey) -> u64 { + let mut hasher = DefaultHasher::new(); + key.to_string().hash(&mut hasher); + hasher.finish() +} + +/// quick display function for a neat error +pub fn display_hashset(input: &HashSet) -> String { + let mut out = String::new(); + for item in input.iter() { + write!(out, "{}, ", item.to_string()).unwrap(); + } + out +} + +#[cfg(test)] +mod tests { + use super::hash_wgkey; + use crate::{ + database::ipddr_assignment::generate_iterative_client_subnet, ClientListAnIpAssignmentMap, + }; + use althea_types::Identity; + use ipnetwork::{IpNetwork, Ipv6Network}; + use std::collections::HashSet; + + pub fn get_test_data() -> ClientListAnIpAssignmentMap { + let clients = HashSet::new(); + ClientListAnIpAssignmentMap::new(clients) + } + + #[test] + fn test_internet_ipv6_assignment() { + let mut data = get_test_data(); + let exit_sub = Some("2602:FBAD:10::/126".parse().unwrap()); + let dummy_client = Identity { + mesh_ip: "fd00::1337".parse().unwrap(), + eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" + .parse() + .unwrap(), + wg_public_key: "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" + .parse() + .unwrap(), + nickname: None, + }; + + // Generate a client subnet + let ip = data + .get_or_add_client_ipv6(dummy_client, exit_sub, 128) + .unwrap() + .unwrap(); + + // Verify assignement db is correctly populated + assert!(data.get_ipv6_assignments().len() == 1); + assert_eq!(*data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client); + + // Try retrieving the same client + let ip_2 = data + .get_or_add_client_ipv6(dummy_client, exit_sub, 128) + .unwrap() + .unwrap(); + assert_eq!(ip, ip_2); + + // Make sure no new entries in assignemnt db + assert!(data.get_ipv6_assignments().len() == 1); + assert_eq!(*data.get_ipv6_assignments().get(&ip).unwrap(), dummy_client); + + println!("Assigned Ip client 1: {:?}", ip); + + // Add a second client + let dummy_client_2 = Identity { + mesh_ip: "fd00::1447".parse().unwrap(), + eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" + .parse() + .unwrap(), + wg_public_key: "CEnTMKvpWr+xTFl7niTYyqH56w5iPdMjiC938X542GA=" + .parse() + .unwrap(), + nickname: None, + }; + + // Generate a client subnet + let ip = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) + .unwrap() + .unwrap(); + + // Verify assignement db is correctly populated + assert!(data.get_ipv6_assignments().len() == 2); + assert_eq!( + *data.get_ipv6_assignments().get(&ip).unwrap(), + dummy_client_2 + ); + + let ip_2 = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) + .unwrap() + .unwrap(); + assert_eq!(ip, ip_2); + + // Make sure no new entries in assignemnt db + assert!(data.get_ipv6_assignments().len() == 2); + assert_eq!( + *data.get_ipv6_assignments().get(&ip).unwrap(), + dummy_client_2 + ); + + println!("Assigned Ip client 2: {:?}", ip); + + // Generate a collision + let dummy_client_3 = Identity { + mesh_ip: "fd00::1557".parse().unwrap(), + eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" + .parse() + .unwrap(), + wg_public_key: "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" + .parse() + .unwrap(), + nickname: None, + }; + + // Generate a client subnet + let ip = data + .get_or_add_client_ipv6(dummy_client_3, exit_sub, 128) + .unwrap() + .unwrap(); + + // Verify assignement db is correctly populated + assert!(data.get_ipv6_assignments().len() == 3); + assert_eq!( + *data.get_ipv6_assignments().get(&ip).unwrap(), + dummy_client_3 + ); + + let _ = data + .get_or_add_client_ipv6(dummy_client_2, exit_sub, 128) + .unwrap() + .unwrap(); + let ip_2 = data + .get_or_add_client_ipv6(dummy_client_3, exit_sub, 128) + .unwrap() + .unwrap(); + assert_eq!(ip, ip_2); + + // Make sure no new entries in assignemnt db + assert!(data.get_ipv6_assignments().len() == 3); + assert_eq!( + *data.get_ipv6_assignments().get(&ip).unwrap(), + dummy_client_3 + ); + + println!("Assigned Ip client 3: {:?}", ip); + } + + #[test] + fn hash_playground() { + let key_1_hash = hash_wgkey( + "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" + .parse() + .unwrap(), + ) % 4; + println!("hash: {}", key_1_hash); + let key_1_hash = hash_wgkey( + "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" + .parse() + .unwrap(), + ) % 4; + println!("hash: {}", key_1_hash); + let key_1_hash = hash_wgkey( + "CEnTMKvpWr+xTFl7niTYyqH56w5iPdMjiC938X542GA=" + .parse() + .unwrap(), + ) % 4; + println!("hash: {}", key_1_hash); + } + + #[test] + fn test_internal_ip_assignment() { + let mut data = get_test_data(); + let dummy_client = Identity { + mesh_ip: "fd00::1337".parse().unwrap(), + eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" + .parse() + .unwrap(), + wg_public_key: "TgR85AcLBY/7cLHXZIICcwVDU+1Pj/cjFeduCUNvLVU=" + .parse() + .unwrap(), + nickname: None, + }; + let ip = data + .get_or_add_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); + + // Verify assignement db is correctly populated + assert!(data.get_internal_ip_assignments().len() == 1); + assert_eq!( + *data.get_internal_ip_assignments().get(&ip).unwrap(), + dummy_client + ); + + // requesting the same client shouldnt change any state + let ip2 = data + .get_or_add_client_internal_ip(dummy_client, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); + + assert_eq!(ip, ip2); + + assert!(data.get_internal_ip_assignments().len() == 1); + assert_eq!( + *data.get_internal_ip_assignments().get(&ip2).unwrap(), + dummy_client + ); + + println!("Internal ip client 1: {}", ip); + + // Second client who collides + let dummy_client_2 = Identity { + mesh_ip: "fd00::1557".parse().unwrap(), + eth_address: "0x4Af6D4125f3CBF07EBAD056E2eCa7b17c58AFEa4" + .parse() + .unwrap(), + wg_public_key: "+Iai9Qj5aIuTAq6h1srDL8yKElN65/PhNtkccSOJwls=" + .parse() + .unwrap(), + nickname: None, + }; + + let ip = data + .get_or_add_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); + + // Verify assignement db is correctly populated + assert!(data.get_internal_ip_assignments().len() == 2); + assert_eq!( + *data.get_internal_ip_assignments().get(&ip).unwrap(), + dummy_client_2 + ); + + // requesting the same client shouldnt change any state + let ip2 = data + .get_or_add_client_internal_ip(dummy_client_2, 30, "172.168.0.100".parse().unwrap()) + .unwrap(); + + assert_eq!(ip, ip2); + + assert!(data.get_internal_ip_assignments().len() == 2); + assert_eq!( + *data.get_internal_ip_assignments().get(&ip2).unwrap(), + dummy_client_2 + ); + + println!("Internal ip client 2: {}", ip); + } + + /// Test iterative subnet generation + #[test] + fn test_generate_iterative_subnet() { + // Complex subnet example + let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 0, 64); + assert_eq!( + "2602:FBAD::/64".parse::().unwrap(), + ret.unwrap() + ); + + let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 1, 64); + assert_eq!( + "2602:FBAD:0:1::/64".parse::().unwrap(), + ret.unwrap() + ); + + let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 50, 64); + assert_eq!( + "2602:FBAD:0:32::/64".parse::().unwrap(), + ret.unwrap() + ); + + let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 2_u64.pow(24), 64); + assert!(ret.is_err()); + + let net: IpNetwork = "2602:FBAD::/40".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 0, 30); + assert!(ret.is_err()); + + // Simple subnet example + let net: IpNetwork = "fd00::1337/120".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 0, 124); + assert_eq!( + "fd00::1300/124".parse::().unwrap(), + ret.unwrap() + ); + + let net: IpNetwork = "fd00::1337/120".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 2, 124); + assert_eq!( + "fd00::1320/124".parse::().unwrap(), + ret.unwrap() + ); + + let net: IpNetwork = "fd00::1337/120".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 15, 124); + assert_eq!( + "fd00::13f0/124".parse::().unwrap(), + ret.unwrap() + ); + let net: IpNetwork = "fd00::1337/120".parse().unwrap(); + let ret = generate_iterative_client_subnet(net, 16, 124); + assert!(ret.is_err()); + } +} diff --git a/rita_exit/src/database/mod.rs b/rita_exit/src/database/mod.rs index 308564e43..6aa4c59a3 100644 --- a/rita_exit/src/database/mod.rs +++ b/rita_exit/src/database/mod.rs @@ -4,15 +4,13 @@ use crate::database::geoip::get_gateway_ip_bulk; use crate::database::geoip::get_gateway_ip_single; use crate::database::geoip::verify_ip; -use crate::database::in_memory_database::display_hashset; -use crate::database::in_memory_database::get_client_internal_ip; -use crate::database::in_memory_database::get_client_ipv6; -use crate::database::in_memory_database::to_exit_client; -use crate::database::in_memory_database::DEFAULT_CLIENT_SUBNET_SIZE; +use crate::database::ipddr_assignment::display_hashset; +use crate::database::ipddr_assignment::DEFAULT_CLIENT_SUBNET_SIZE; +use crate::rita_loop::RitaExitData; use crate::rita_loop::EXIT_INTERFACE; use crate::rita_loop::EXIT_LOOP_TIMEOUT; use crate::rita_loop::LEGACY_INTERFACE; -use crate::IpAssignmentMap; +use crate::ClientListAnIpAssignmentMap; use crate::RitaExitError; use althea_kernel_interface::exit_server_tunnel::set_exit_wg_config; use althea_kernel_interface::exit_server_tunnel::setup_individual_client_routes; @@ -29,10 +27,9 @@ use althea_types::regions::Regions; use althea_types::Identity; use althea_types::WgKey; use althea_types::{ExitClientDetails, ExitClientIdentity, ExitDetails, ExitState, ExitVerifMode}; -use clarity::Address; -use exit_trust_root::client_db::get_registered_client_using_wgkey; use exit_trust_root::endpoints::RegisterRequest; use exit_trust_root::endpoints::SubmitCodeRequest; +use ipnetwork::IpNetwork; use phonenumber::PhoneNumber; use rita_common::blockchain_oracle::calculate_close_thresh; use rita_common::debt_keeper::get_debts_list; @@ -46,23 +43,9 @@ use std::sync::RwLock; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; -use web30::client::Web3; pub mod geoip; -pub mod in_memory_database; - -#[derive(Clone, Debug, Default)] -pub struct RitaExitState { - ip_assignment_map: IpAssignmentMap, - geoip_cache: HashMap, -} - -lazy_static! { - /// Keep track of geoip information as well as ip addrs assigned to clients and ensure collisions dont happen. In worst case - /// the exit restarts and loses all this data in which case those client they had collision may get new - /// ip addrs and would need to setup wg exit tunnel again - static ref RITA_EXIT_STATE: Arc> = Arc::new(RwLock::new(RitaExitState::default())); -} +pub mod ipddr_assignment; /// one day in seconds pub const ONE_DAY: i64 = 86400; @@ -90,13 +73,20 @@ pub fn get_exit_info() -> ExitDetails { /// Handles a new client registration api call. Performs a geoip lookup /// on their registration ip to make sure that they are coming from a valid gateway /// ip and then sends out an email of phone message -pub async fn signup_client(client: ExitClientIdentity) -> Result> { +pub async fn signup_client( + client: ExitClientIdentity, + client_and_ip_info: Arc>>, +) -> Result> { let exit_settings = get_rita_exit(); info!("got setup request {:?}", client); let gateway_ip = get_gateway_ip_single(client.global.mesh_ip)?; info!("got gateway ip {:?}", client); - let verify_status = verify_ip(gateway_ip)?; + // dummy empty cache because signups don't happen often enough to bother using a locked unified cache + // between the actix worker threads and the main thread. The main thread bulk checks all clients every + // 5 seconds so caching goes a lot further there + let mut cache = HashMap::new(); + let verify_status = verify_ip(&mut cache, gateway_ip).await?; info!("verified the ip country {:?}", client); // Is client requesting from a valid country? If so send registration request to ops @@ -128,7 +118,10 @@ pub async fn signup_client(client: ExitClientIdentity) -> Result Result> { - trace!("Checking if record exists for {:?}", client.global.mesh_ip); - let exit = get_rita_exit(); - let exit_network = exit.exit_network.clone(); - let own_internal_ip = exit_network.internal_ipv4.internal_ip(); - let internal_netmask = exit_network.internal_ipv4.prefix(); - - match get_registered_client_using_wgkey( - client.global.wg_public_key, - our_address, - contract_addr, - contact, - ) - .await - { - Ok(their_record) => { - trace!("record exists, updating"); - - let current_ip: IpAddr = - get_client_internal_ip(their_record, internal_netmask, own_internal_ip)?; - let current_internet_ipv6 = get_client_ipv6( - their_record, - exit_network.get_ipv6_subnet_alt(), - exit.get_client_subnet_size() - .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), - )?; - - Ok(ExitState::Registered { - our_details: ExitClientDetails { - client_internal_ip: current_ip, - internet_ipv6_subnet: current_internet_ipv6, - }, - general_details: get_exit_info(), - message: "Registration OK".to_string(), - identity: Box::new(exit.get_exit_identity()), - }) - } - Err(e) => { - trace!("Failed to retrieve a client: {}", e); - Err(Box::new(RitaExitError::NoClientError)) - } - } -} - /// Every 5 seconds we validate all online clients to make sure that they are in the right region /// we also do this in the client status requests but we want to handle the edge case of a modified /// client that doesn't make status requests -pub fn validate_clients_region( +pub async fn validate_clients_region( + geoip_cache: &mut HashMap, clients_list: Vec, ) -> Result, Box> { info!("Starting exit region validation"); @@ -300,7 +244,7 @@ pub fn validate_clients_region( } let list = get_gateway_ip_bulk(ip_vec, EXIT_LOOP_TIMEOUT)?; for item in list.iter() { - let res = verify_ip(item.gateway_ip); + let res = verify_ip(geoip_cache, item.gateway_ip).await; match res { Ok(true) => trace!("{:?} is from an allowed ip", item), Ok(false) => { @@ -349,13 +293,16 @@ pub struct CurrentExitClientState { /// into a single very long wg tunnel setup command which is then applied to the /// wg_exit tunnel (or created if it's the first run). This is the offically supported /// way to update live WireGuard tunnels and should not disrupt traffic -pub fn setup_clients( - clients_list: Vec, - geoip_blacklist: Vec, - client_states: ExitClientSetupStates, -) -> Result> { - let mut client_states = client_states; +pub fn setup_clients(client_data: &mut RitaExitData) -> Result<(), Box> { let start = Instant::now(); + // Note, the data flow in this fuction is strage, we have getters and setters for all + // data, but, some functions like id_to_exit_client will assign ip addresses to clients + // thus modifying the internal state of the client_data object despite not obviously being + // a setter. This is a holdover from the original design of the code and should be cleaned up with + // more explicit ip allocation functions + let clients_list = client_data.get_all_registered_clients(); + let mut client_states = client_data.get_setup_states(); + let geoip_blacklist = client_data.get_geoip_blacklist(); // use hashset to ensure uniqueness and check for duplicate db entries let mut wg_clients = HashSet::new(); @@ -369,7 +316,7 @@ pub fn setup_clients( ); for c in clients_list.iter() { - match to_exit_client(*c) { + match client_data.id_to_exit_client(*c) { Ok(a) => { if !wg_clients.insert(a) { error!("Duplicate database entry! {}", c.wg_public_key); @@ -385,7 +332,7 @@ pub fn setup_clients( } for c in geoip_blacklist.iter() { - match to_exit_client(*c) { + match client_data.id_to_exit_client(*c) { Ok(a) => { if !geoip_blacklist_map.insert(a) { error!("Duplicate database entry! {}", c.wg_public_key); @@ -518,8 +465,12 @@ pub fn setup_clients( for c_key in changed_clients_return.new_v1 { if let Some(c) = key_to_client_map.get(&c_key) { setup_individual_client_routes( - match get_client_internal_ip(*c, internal_netmask, internal_ip_v4) { - Ok(a) => a, + match client_data.get_or_add_client_internal_ip( + *c, + internal_netmask, + internal_ip_v4, + ) { + Ok(a) => std::net::IpAddr::V4(a), Err(e) => { error!( "Received error while trying to retrieve client internal ip {}", @@ -536,8 +487,12 @@ pub fn setup_clients( for c_key in changed_clients_return.new_v2 { if let Some(c) = key_to_client_map.get(&c_key) { teardown_individual_client_routes( - match get_client_internal_ip(*c, internal_netmask, internal_ip_v4) { - Ok(a) => a, + match client_data.get_or_add_client_internal_ip( + *c, + internal_netmask, + internal_ip_v4, + ) { + Ok(a) => std::net::IpAddr::V4(a), Err(e) => { error!( "Received error while trying to retrieve client internal ip {}", @@ -550,7 +505,8 @@ pub fn setup_clients( } } - Ok(client_states) + client_data.set_setup_states(client_states); + Ok(()) } /// Find all clients that underwent transition from b19 -> 20 or vice versa and need updated rules and routes @@ -639,16 +595,13 @@ pub fn get_client_interface( /// setting the htb class they are assigned to to a maximum speed of the free tier value. /// Unlike intermediary enforcement we do not need to subdivide the free tier to prevent /// ourselves from exceeding the upstream free tier. As an exit we are the upstream. -pub fn enforce_exit_clients( - clients_list: Vec, - old_debt_actions: &HashSet<(Identity, DebtAction)>, -) -> Result, Box> { +pub fn enforce_exit_clients(client_data: &mut RitaExitData) -> Result<(), Box> { let start = Instant::now(); let mut clients_by_id = HashMap::new(); let free_tier_limit = settings::get_rita_exit().payment.free_tier_throughput; let close_threshold = calculate_close_thresh(); - for client_id in clients_list.iter() { - if let Ok(exit_client) = to_exit_client(*client_id) { + for client_id in client_data.get_all_registered_clients() { + if let Ok(exit_client) = client_data.id_to_exit_client(client_id) { clients_by_id.insert(client_id, exit_client); } } @@ -660,6 +613,7 @@ pub fn enforce_exit_clients( ); // build the new debt actions list and see if we need to do anything + let old_debt_actions = client_data.get_debt_actions(); let mut new_debt_actions = HashSet::new(); for debt_entry in list.iter() { new_debt_actions.insert(( @@ -668,12 +622,12 @@ pub fn enforce_exit_clients( )); } if new_debt_actions - .symmetric_difference(old_debt_actions) + .symmetric_difference(&old_debt_actions) .count() == 0 { info!("No change in enforcement list found, skipping tc calls"); - return Ok(new_debt_actions); + return Ok(()); } for debt_entry in list.iter() { @@ -713,7 +667,7 @@ pub fn enforce_exit_clients( error!("Failed to setup flow for wg_exit_v2 {:?}", e); } // gets the client ipv6 flow for this exit specifically - let client_ipv6 = get_client_ipv6( + let client_ipv6 = client_data.get_or_add_client_ipv6( debt_entry.identity, settings::get_rita_exit().exit_network.get_ipv6_subnet_alt(), settings::get_rita_exit() @@ -721,9 +675,11 @@ pub fn enforce_exit_clients( .unwrap_or(DEFAULT_CLIENT_SUBNET_SIZE), ); if let Ok(Some(client_ipv6)) = client_ipv6 { - if let Err(e) = - create_flow_by_ipv6(EXIT_INTERFACE, client_ipv6, ip) - { + if let Err(e) = create_flow_by_ipv6( + EXIT_INTERFACE, + IpNetwork::V6(client_ipv6), + ip, + ) { error!("Failed to setup ipv6 flow for wg_exit_v2 {:?}", e); } } @@ -797,5 +753,6 @@ pub fn enforce_exit_clients( start.elapsed().as_secs(), start.elapsed().subsec_millis(), ); - Ok(new_debt_actions) + client_data.set_debt_actions(new_debt_actions); + Ok(()) } diff --git a/rita_exit/src/lib.rs b/rita_exit/src/lib.rs index fc27a3dff..097d789f4 100644 --- a/rita_exit/src/lib.rs +++ b/rita_exit/src/lib.rs @@ -1,9 +1,6 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate lazy_static; - #[macro_use] extern crate serde_derive; @@ -16,7 +13,7 @@ pub mod rita_loop; pub mod traffic_watcher; pub use crate::database::geoip::*; -pub use crate::database::in_memory_database::*; +pub use crate::database::ipddr_assignment::*; pub use error::RitaExitError; use rita_common::dashboard::own_info::READABLE_VERSION; use std::path::PathBuf; diff --git a/rita_exit/src/network_endpoints/mod.rs b/rita_exit/src/network_endpoints/mod.rs index 43e20ede2..601c315fd 100644 --- a/rita_exit/src/network_endpoints/mod.rs +++ b/rita_exit/src/network_endpoints/mod.rs @@ -1,9 +1,8 @@ //! Network endpoints for rita-exit that are not dashboard or local infromational endpoints //! these are called by rita instances to operate the mesh -use crate::database::{client_status, signup_client}; - -use crate::RitaExitError; +use crate::database::signup_client; +use crate::{ClientListAnIpAssignmentMap, RitaExitError}; use actix_web::web; use actix_web::{http::StatusCode, web::Json, HttpRequest, HttpResponse, Result}; use althea_types::Identity; @@ -18,14 +17,12 @@ use crypto_box::SecretKey; use num256::Int256; use rita_common::blockchain_oracle::potential_payment_issues_detected; use rita_common::debt_keeper::get_debts_list; -use rita_common::rita_loop::get_web3_server; use settings::get_rita_exit; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::time::Duration; use std::time::SystemTime; -use web30::client::Web3; // Timeout to contact Althea contract and query info about a user pub const CLIENT_STATUS_TIMEOUT: Duration = Duration::from_secs(20); @@ -64,8 +61,10 @@ fn decrypt_exit_client_id_helper( pub async fn secure_setup_request( request: (Json, HttpRequest), + client_and_ip_info: web::Data>>, ) -> HttpResponse { let exit_settings = get_rita_exit(); + let client_and_ip_info = client_and_ip_info.into_inner(); let our_secretkey: WgKey = exit_settings.network.wg_private_key.unwrap(); @@ -103,7 +102,7 @@ pub async fn secure_setup_request( let remote_mesh_ip = remote_mesh_socket.ip(); if remote_mesh_ip == client_mesh_ip { - let result = signup_client(*client).await; + let result = signup_client(*client, client_and_ip_info).await; match result { Ok(exit_state) => HttpResponse::Ok().json(encrypt_setup_return( exit_state, @@ -128,18 +127,13 @@ pub async fn secure_setup_request( } } -pub async fn secure_status_request(request: Json) -> HttpResponse { +pub async fn secure_status_request( + request: Json, + client_and_ip_info: web::Data>>, +) -> HttpResponse { let exit_settings = get_rita_exit(); let our_secretkey: WgKey = exit_settings.network.wg_private_key.unwrap(); - let our_address = exit_settings - .payment - .eth_private_key - .expect("Why dont we have a private key?") - .to_address(); - let contract_addr = exit_settings.exit_network.registered_users_contract_addr; - let contact = Web3::new(&get_web3_server(), CLIENT_STATUS_TIMEOUT); - let their_wg_pubkey = request.pubkey; let their_nacl_pubkey = request.pubkey.into(); let exit_client_id = request.into_inner(); @@ -156,7 +150,11 @@ pub async fn secure_status_request(request: Json) - trace!("got status request from {}", their_wg_pubkey); // We use our eth address as the requesting address - let state = match client_status(*decrypted_id, our_address, contract_addr, &contact).await { + let state = match client_and_ip_info + .write() + .unwrap() + .get_client_status(*decrypted_id) + { Ok(state) => state, Err(e) => match *e { RitaExitError::NoClientError => { diff --git a/rita_exit/src/rita_loop/mod.rs b/rita_exit/src/rita_loop/mod.rs index 4984736d8..5d827793a 100644 --- a/rita_exit/src/rita_loop/mod.rs +++ b/rita_exit/src/rita_loop/mod.rs @@ -13,22 +13,25 @@ use crate::database::{ enforce_exit_clients, setup_clients, validate_clients_region, ExitClientSetupStates, }; -use crate::network_endpoints::*; use crate::traffic_watcher::watch_exit_traffic; +use crate::{network_endpoints::*, ClientListAnIpAssignmentMap, RitaExitError}; use actix::System as AsyncSystem; use actix_web::{web, App, HttpServer}; use althea_kernel_interface::exit_server_tunnel::{one_time_exit_setup, setup_nat}; use althea_kernel_interface::setup_wg_if::create_blank_wg_interface; use althea_kernel_interface::wg_iface_counter::WgUsage; use althea_kernel_interface::ExitClient; +use althea_types::regions::Regions; use althea_types::{Identity, SignedExitServerList, WgKey}; use babel_monitor::{open_babel_stream, parse_routes}; use clarity::Address; use exit_trust_root::client_db::get_all_registered_clients; +use ipnetwork::{IpNetwork, Ipv6Network}; use rita_common::debt_keeper::DebtAction; use rita_common::rita_loop::get_web3_server; use settings::exit::EXIT_LIST_PORT; use std::collections::{HashMap, HashSet}; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -45,70 +48,227 @@ pub const LEGACY_INTERFACE: &str = "wg_exit"; pub const EXIT_INTERFACE: &str = "wg_exit_v2"; /// Cache of rita exit state to track across ticks -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct RitaExitCache { - // a cache of what tunnels we had setup last round, used to prevent extra setup ops +#[derive(Clone, Debug)] +pub struct RitaExitData { + /// a cache of what tunnels we had setup last round, used to prevent extra setup ops wg_clients: HashSet, - // a list of client debts from the last round, to prevent extra enforcement ops + /// a list of client debts from the last round, to prevent extra enforcement ops debt_actions: HashSet<(Identity, DebtAction)>, - // if we have successfully setup the wg exit tunnel in the past, if false we have never - // setup exit clients and should crash if we fail to do so, otherwise we are preventing - // proper failover + /// if we have successfully setup the wg exit tunnel in the past, if false we have never + /// setup exit clients and should crash if we fail to do so, otherwise we are preventing + /// proper failover successful_setup: bool, - // cache of b19 routers we have successful rules and routes for + /// cache of b19 routers we have successful rules and routes for wg_exit_clients: HashSet, - // cache of b20 routers we have successful rules and routes for + /// cache of b20 routers we have successful rules and routes for wg_exit_v2_clients: HashSet, - // A blacklist of clients that we fail geoip verification for. We tear down these routes + /// A blacklist of clients that we fail geoip verification for. We tear down these routes geoip_blacklist: Vec, + /// A list of geoip info that we have already requested since startup, to reduce api usage + geoip_cache: HashMap, + // ip assignments for clients, represented as a locked map so that we can tell clients what ip's they where + // assigned in the actix worker threads which also has a copy of this lock + client_list_and_ip_assignments: Arc>, + /// A cache of the last usage of the wg tunnels, this must be maintained from when the tunnel for a specific + /// client is created to when it is destroyed/recreated otherwise overbilling will occur + usage_history: HashMap, } -pub type ExitLock = Arc>>; +impl RitaExitData { + pub fn new(client_list_and_ip_assignments: Arc>) -> Self { + RitaExitData { + wg_clients: HashSet::new(), + debt_actions: HashSet::new(), + successful_setup: false, + wg_exit_clients: HashSet::new(), + wg_exit_v2_clients: HashSet::new(), + geoip_blacklist: Vec::new(), + geoip_cache: HashMap::new(), + client_list_and_ip_assignments, + usage_history: HashMap::new(), + } + } + + pub fn is_client_registered(&self, client: Identity) -> bool { + self.client_list_and_ip_assignments + .read() + .unwrap() + .is_client_registered(client) + } + + pub fn get_ipv6_assignments(&self) -> HashMap { + self.client_list_and_ip_assignments + .read() + .unwrap() + .get_ipv6_assignments() + } + + pub fn get_internal_ip_assignments(&self) -> HashMap { + self.client_list_and_ip_assignments + .read() + .unwrap() + .get_internal_ip_assignments() + } + + pub fn id_to_exit_client(&self, id: Identity) -> Result> { + self.client_list_and_ip_assignments + .write() + .unwrap() + .id_to_exit_client(id) + } + + pub fn get_or_add_client_internal_ip( + &self, + their_record: Identity, + netmask: u8, + gateway_ip: Ipv4Addr, + ) -> Result> { + self.client_list_and_ip_assignments + .write() + .unwrap() + .get_or_add_client_internal_ip(their_record, netmask, gateway_ip) + } + + pub fn get_or_add_client_ipv6( + &self, + their_record: Identity, + exit_sub: Option, + client_subnet_size: u8, + ) -> Result, Box> { + self.client_list_and_ip_assignments + .write() + .unwrap() + .get_or_add_client_ipv6(their_record, exit_sub, client_subnet_size) + } + + pub fn get_setup_states(&self) -> ExitClientSetupStates { + ExitClientSetupStates { + old_clients: self.wg_clients.clone(), + wg_exit_clients: self.wg_exit_clients.clone(), + wg_exit_v2_clients: self.wg_exit_v2_clients.clone(), + } + } + + pub fn set_setup_states(&mut self, states: ExitClientSetupStates) { + self.wg_clients = states.old_clients; + self.wg_exit_clients = states.wg_exit_clients; + self.wg_exit_v2_clients = states.wg_exit_v2_clients; + } + + pub fn get_all_registered_clients(&self) -> HashSet { + self.client_list_and_ip_assignments + .read() + .unwrap() + .get_registered_clients() + } + + pub fn set_registered_clients(&mut self, clients: HashSet) { + self.client_list_and_ip_assignments + .write() + .unwrap() + .set_registered_clients(clients); + } + + pub fn get_geoip_blacklist(&self) -> Vec { + self.geoip_blacklist.clone() + } + + pub fn get_debt_actions(&self) -> HashSet<(Identity, DebtAction)> { + self.debt_actions.clone() + } + + pub fn set_debt_actions(&mut self, debt_actions: HashSet<(Identity, DebtAction)>) { + self.debt_actions = debt_actions; + } +} /// Starts the rita exit billing thread, this thread deals with blocking db -/// calls and performs various tasks required for billing. The tasks interacting -/// with actix are the most troublesome because the actix system may restart -/// and crash this thread. To prevent that and other crashes we have a watchdog -/// thread which simply restarts the internal thread. -pub fn start_rita_exit_loop(reg_clients_list: Vec) { +/// calls and performs various tasks required for billing. If this thread crashes +/// due to consistenty requirements the whole application should be restarted +/// this will cause the wg tunnels to get torn down and rebuilt, putting things back into +/// a consistent state +pub async fn start_rita_exit_loop(client_and_ip_info: Arc>) { setup_exit_wg_tunnel(); - // the last usage of the wg tunnels, if an innner thread restarts this must be preserved to prevent - // overbilling users - let usage_history = Arc::new(RwLock::new(HashMap::new())); - - // this will always be an error, so it's really just a loop statement - // with some fancy destructuring, blocking the caller thread as a watchdog - while let Err(e) = { - let reg_clients_list = reg_clients_list.clone(); - // ARC will simply clone the same reference - let usage_history = usage_history.clone(); - thread::spawn(move || { - // Internal exit cache that store state across multiple ticks - let mut rita_exit_cache = RitaExitCache::default(); - let mut reg_clients_list = reg_clients_list.clone(); - let runner = AsyncSystem::new(); - runner.block_on(async move { - loop { - reg_clients_list = update_client_list(reg_clients_list).await; - - rita_exit_cache = rita_exit_loop( - reg_clients_list.clone(), - rita_exit_cache, - usage_history.clone(), - ) - .await; - } - }) - }) - .join() - } { - error!("Exit loop thread panicked! Respawning {:?}", e); + let mut rita_exit_cache = RitaExitData::new(client_and_ip_info); + loop { + let start = Instant::now(); + + // Internal exit cache that store state across multiple ticks + rita_exit_cache.set_registered_clients( + update_client_list(rita_exit_cache.get_all_registered_clients()).await, + ); + + let rita_exit = settings::get_rita_exit(); + let babel_port = rita_exit.network.babel_port; + + let start_bill_benchmark = Instant::now(); + // watch and bill for traffic + bill( + babel_port, + start, + rita_exit_cache.get_all_registered_clients(), + rita_exit_cache.usage_history.clone(), + ); + info!( + "Finished Rita billing in {}ms", + start_bill_benchmark.elapsed().as_millis() + ); + + info!("About to setup clients"); + let start_setup_benchmark = Instant::now(); + // Create and update client tunnels + match setup_clients(&mut rita_exit_cache) { + Ok(_) => { + rita_exit_cache.successful_setup = true; + } + Err(e) => error!("Setup clients failed with {:?}", e), + } + info!( + "Finished Rita setting up clients in {}ms", + start_setup_benchmark.elapsed().as_millis() + ); + + // Make sure no one we are setting up is geoip unauthorized + let start_region_benchmark = Instant::now(); + info!("about to check regions"); + let clients_list = rita_exit_cache.get_all_registered_clients(); + if let Some(list) = check_regions( + &mut rita_exit_cache.geoip_cache, + start, + clients_list.iter().cloned().collect(), + ) + .await + { + rita_exit_cache.geoip_blacklist = list; + } + info!( + "Finished Rita checking region in {}ms", + start_region_benchmark.elapsed().as_millis() + ); + info!("About to enforce exit clients"); + // handle enforcement on client tunnels by querying debt keeper + // this consumes client list + let start_enforce_benchmark = Instant::now(); + match enforce_exit_clients(&mut rita_exit_cache) { + Ok(_) => {} + Err(e) => warn!("Failed to enforce exit clients with {:?}", e,), + } + info!( + "Finished Rita enforcement in {}ms ", + start_enforce_benchmark.elapsed().as_millis() + ); + info!( + "Finished Rita exit loop in {}ms, all vars should be dropped", + start.elapsed().as_millis(), + ); + + thread::sleep(EXIT_LOOP_SPEED_DURATION); } } /// Updates the client list, if this is not successful the old client list is used -async fn update_client_list(reg_clients_list: Vec) -> Vec { +async fn update_client_list(reg_clients_list: HashSet) -> HashSet { let payment_settings = settings::get_rita_common().payment; let contract_address = settings::get_rita_exit() .exit_network @@ -138,90 +298,21 @@ async fn update_client_list(reg_clients_list: Vec) -> Vec { } } -async fn rita_exit_loop( - reg_clients_list: Vec, - rita_exit_cache: RitaExitCache, - usage_history: ExitLock, -) -> RitaExitCache { - let mut rita_exit_cache = rita_exit_cache; - let start = Instant::now(); - - let rita_exit = settings::get_rita_exit(); - let babel_port = rita_exit.network.babel_port; - - let ids = reg_clients_list.clone(); - let start_bill_benchmark = Instant::now(); - // watch and bill for traffic - bill(babel_port, start, ids, usage_history); - info!( - "Finished Rita billing in {}ms", - start_bill_benchmark.elapsed().as_millis() - ); - - info!("About to setup clients"); - let start_setup_benchmark = Instant::now(); - // Create and update client tunnels - match setup_clients( - reg_clients_list.clone(), - rita_exit_cache.geoip_blacklist.clone(), - ExitClientSetupStates { - old_clients: rita_exit_cache.wg_clients.clone(), - wg_exit_clients: rita_exit_cache.wg_exit_clients.clone(), - wg_exit_v2_clients: rita_exit_cache.wg_exit_v2_clients.clone(), - }, - ) { - Ok(client_states) => { - rita_exit_cache.successful_setup = true; - rita_exit_cache.wg_clients = client_states.old_clients; - rita_exit_cache.wg_exit_clients = client_states.wg_exit_clients; - rita_exit_cache.wg_exit_v2_clients = client_states.wg_exit_v2_clients; - } - Err(e) => error!("Setup clients failed with {:?}", e), - } - info!( - "Finished Rita setting up clients in {}ms", - start_setup_benchmark.elapsed().as_millis() - ); - - // Make sure no one we are setting up is geoip unauthorized - let start_region_benchmark = Instant::now(); - info!("about to check regions"); - if let Some(list) = check_regions(start, reg_clients_list.clone()) { - rita_exit_cache.geoip_blacklist = list; - } - info!( - "Finished Rita checking region in {}ms", - start_region_benchmark.elapsed().as_millis() - ); - info!("About to enforce exit clients"); - // handle enforcement on client tunnels by querying debt keeper - // this consumes client list - let start_enforce_benchmark = Instant::now(); - match enforce_exit_clients(reg_clients_list, &rita_exit_cache.debt_actions.clone()) { - Ok(new_debt_actions) => rita_exit_cache.debt_actions = new_debt_actions, - Err(e) => warn!("Failed to enforce exit clients with {:?}", e,), - } - info!( - "Finished Rita enforcement in {}ms ", - start_enforce_benchmark.elapsed().as_millis() - ); - info!( - "Finished Rita exit loop in {}ms, all vars should be dropped", - start.elapsed().as_millis(), - ); - - thread::sleep(EXIT_LOOP_SPEED_DURATION); - rita_exit_cache -} - -fn bill(babel_port: u16, start: Instant, ids: Vec, usage_history: ExitLock) { +fn bill( + babel_port: u16, + start: Instant, + ids: HashSet, + usage_history: HashMap, +) { trace!("about to try opening babel stream"); match open_babel_stream(babel_port, EXIT_LOOP_TIMEOUT) { Ok(mut stream) => match parse_routes(&mut stream) { Ok(routes) => { trace!("Sending traffic watcher message?"); - if let Err(e) = watch_exit_traffic(usage_history, &routes, &ids) { + if let Err(e) = + watch_exit_traffic(usage_history, &routes, ids.iter().cloned().collect()) + { error!( "Watch exit traffic failed with {}, in {} millis", e, @@ -254,10 +345,14 @@ fn bill(babel_port: u16, start: Instant, ids: Vec, usage_history: Exit /// Run a region validation and return a list of blacklisted clients. This list is later used /// in setup clients to teardown blacklisted client tunnels -fn check_regions(start: Instant, clients_list: Vec) -> Option> { +async fn check_regions( + geoip_cache: &mut HashMap, + start: Instant, + clients_list: Vec, +) -> Option> { let val = settings::get_rita_exit().allowed_countries.is_empty(); if !val { - let res = validate_clients_region(clients_list); + let res = validate_clients_region(geoip_cache, clients_list).await; match res { Err(e) => { warn!( @@ -331,19 +426,21 @@ fn setup_exit_wg_tunnel() { .unwrap(); } -pub fn start_rita_exit_endpoints(workers: usize) { +/// Starts the rita exit endpoints, passing the ip assignments and registered clients lists, these are shared via cross-thread lock +/// with the main rita exit loop. +pub fn start_rita_exit_endpoints(ip_assignments: Arc>) { + let web_data = web::Data::new(ip_assignments); thread::spawn(move || { let runner = AsyncSystem::new(); runner.block_on(async move { - // Exit stuff, huge threadpool to offset Pgsql blocking - let _res = HttpServer::new(|| { + let _res = HttpServer::new(move || { App::new() .route("/secure_setup", web::post().to(secure_setup_request)) .route("/secure_status", web::post().to(secure_status_request)) .route("/client_debt", web::post().to(get_client_debt)) .route("/time", web::get().to(get_exit_timestamp_http)) + .app_data(web_data.clone()) }) - .workers(workers) .bind(format!( "[::0]:{}", settings::get_rita_exit().exit_network.exit_hello_port @@ -360,7 +457,7 @@ pub fn start_rita_exit_endpoints(workers: usize) { /// instance of this IP due to the way babel handles multihoming. Due to race conditions we don't explicitly /// bind to the IP for this listener, we instead bind to all available IPs. As we make tunnels kernel interface /// will add the ip to each wg tunnel and then babel will handle the rest. -pub fn start_rita_exit_list_endpoint(workers: usize) { +pub fn start_rita_exit_list_endpoint() { let exit_contract_data_cache: Arc>> = Arc::new(RwLock::new(HashMap::new())); let web_data = web::Data::new(exit_contract_data_cache.clone()); @@ -372,7 +469,6 @@ pub fn start_rita_exit_list_endpoint(workers: usize) { .route("/exit_list", web::post().to(get_exit_list)) .app_data(web_data.clone()) }) - .workers(workers) .bind(format!("[::0]:{}", EXIT_LIST_PORT,)) .unwrap() .shutdown_timeout(0) diff --git a/rita_exit/src/traffic_watcher/mod.rs b/rita_exit/src/traffic_watcher/mod.rs index 8260c1902..ddb3c8f61 100644 --- a/rita_exit/src/traffic_watcher/mod.rs +++ b/rita_exit/src/traffic_watcher/mod.rs @@ -8,7 +8,6 @@ //! //! Also handles enforcement of nonpayment, since there's no need for a complicated TunnelManager for exits -use crate::rita_loop::ExitLock; use crate::rita_loop::EXIT_INTERFACE; use crate::rita_loop::LEGACY_INTERFACE; use crate::RitaExitError; @@ -151,12 +150,10 @@ fn debts_logging(debts: &HashMap) { /// This traffic watcher watches how much traffic each we send and receive from each client. pub fn watch_exit_traffic( - usage_history: ExitLock, + mut usage_history: HashMap, routes: &[Route], - clients: &[Identity], + clients: Vec, ) -> Result<(), Box> { - let mut usage_history = usage_history.write().unwrap(); - // Since Althea is a pay per forward network we must add a surcharge for transaction fees // to our own price. In the case Exit -> A -> B -> C the exit pays A a lump sum for it's own // fees as well as B's fees. This means the exit pays the transaction fee (a percentage) for @@ -176,7 +173,7 @@ pub fn watch_exit_traffic( } }; - let ret = generate_helper_maps(&our_id, clients); + let ret = generate_helper_maps(&our_id, &clients); let identities = ret.wg_to_id; let id_from_ip = ret.ip_to_id; let destinations = get_babel_info(routes, our_id, id_from_ip);