diff --git a/Cargo.lock b/Cargo.lock index 1e37c1768..4d3648d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,46 @@ dependencies = [ "zeroize", ] +[[package]] +name = "clap" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + [[package]] name = "clarity" version = "1.4.0" @@ -1204,7 +1244,7 @@ dependencies = [ "lazy_static", "regex", "serde", - "strsim", + "strsim 0.10.0", ] [[package]] @@ -1297,13 +1337,15 @@ name = "exit_trust_root" version = "0.21.5" dependencies = [ "actix", + "actix-rt", "actix-web", "althea_types", "awc", + "clap", "clarity", - "crypto_box", + "crossbeam", "env_logger", - "lazy_static", + "futures 0.1.31", "log", "openssl", "openssl-probe", @@ -1846,6 +1888,7 @@ dependencies = [ "babel_monitor", "clarity", "clu", + "crossbeam", "ctrlc", "deep_space", "diesel", @@ -1867,6 +1910,7 @@ dependencies = [ "rita_db_migration", "rita_exit", "settings", + "tokio", "web30", ] @@ -3020,6 +3064,7 @@ version = "0.1.0" dependencies = [ "althea_types", "clarity", + "crossbeam", "diesel", "dotenv", "exit_trust_root", @@ -3574,6 +3619,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.26.3" @@ -3755,9 +3806,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", diff --git a/althea_types/src/contact_info.rs b/althea_types/src/contact_info.rs index 8950db71e..76f25d93c 100644 --- a/althea_types/src/contact_info.rs +++ b/althea_types/src/contact_info.rs @@ -53,14 +53,22 @@ impl From for ContactDetails { impl ContactType { pub fn convert(val: ContactDetails, seq: Option) -> Option { - let same = ExitRegistrationDetails { - phone: val.phone, - email: val.email, - phone_code: None, - email_code: None, - sequence_number: seq, - }; - ContactStorage::convert(same).map(|val| val.into()) + match (val.email, val.phone) { + (Some(email), Some(phone)) => Some(ContactType::Both { + email: email.parse().ok()?, + number: phone.parse().ok()?, + sequence_number: seq, + }), + (Some(email), None) => Some(ContactType::Email { + email: email.parse().ok()?, + sequence_number: seq, + }), + (None, Some(phone)) => Some(ContactType::Phone { + number: phone.parse().ok()?, + sequence_number: seq, + }), + (None, None) => None, + } } } @@ -299,35 +307,35 @@ impl ContactStorage { email: Some(email), phone_code: _, email_code: _, - sequence_number, + exit_database_contract: _, } => match (phone.parse(), email.parse()) { (Ok(validated_phone), Ok(validated_email)) => Some(ContactStorage { number: Some(validated_phone), email: Some(validated_email), invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), (Err(_e), Ok(validated_email)) => Some(ContactStorage { email: Some(validated_email), number: None, invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), (Ok(validated_phone), Err(_e)) => Some(ContactStorage { number: Some(validated_phone), email: None, invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), (Err(_ea), Err(_eb)) => Some(ContactStorage { number: None, email: None, invalid_email: Some(email), invalid_number: Some(phone), - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), }, ExitRegistrationDetails { @@ -335,21 +343,21 @@ impl ContactStorage { email: None, phone_code: _, email_code: _, - sequence_number, + exit_database_contract: _, } => match phone.parse() { Ok(validated_phone) => Some(ContactStorage { number: Some(validated_phone), email: None, invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), Err(_e) => Some(ContactStorage { number: None, email: None, invalid_number: Some(phone), invalid_email: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), }, ExitRegistrationDetails { @@ -357,21 +365,21 @@ impl ContactStorage { email: Some(email), phone_code: _, email_code: _, - sequence_number, + exit_database_contract: _, } => match email.parse() { Ok(validated_email) => Some(ContactStorage { email: Some(validated_email), number: None, invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), Err(_e) => Some(ContactStorage { email: None, number: None, invalid_email: Some(email), invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), }, ExitRegistrationDetails { @@ -379,70 +387,14 @@ impl ContactStorage { email: None, phone_code: _, email_code: _, - sequence_number, + exit_database_contract: _, } => Some(ContactStorage { email: None, number: None, invalid_email: None, invalid_number: None, - sequence_number: sequence_number.unwrap_or(0), + sequence_number: 0, }), } } } - -impl From for ExitRegistrationDetails { - fn from(ct: ContactType) -> Self { - match ct { - ContactType::Both { - number, - email, - sequence_number, - } => ExitRegistrationDetails { - phone: Some(number.to_string()), - email: Some(email.to_string()), - email_code: None, - phone_code: None, - sequence_number, - }, - ContactType::Email { - email, - sequence_number, - } => ExitRegistrationDetails { - phone: None, - email: Some(email.to_string()), - email_code: None, - phone_code: None, - sequence_number, - }, - ContactType::Phone { - number, - sequence_number, - } => ExitRegistrationDetails { - phone: Some(number.to_string()), - email: None, - email_code: None, - phone_code: None, - sequence_number, - }, - ContactType::Bad { - invalid_email, - invalid_number, - sequence_number, - } => ExitRegistrationDetails { - phone: invalid_number, - email: invalid_email, - email_code: None, - phone_code: None, - sequence_number, - }, - } - } -} - -impl From for ExitRegistrationDetails { - fn from(cs: ContactStorage) -> Self { - let ct: ContactType = cs.into(); - ct.into() - } -} diff --git a/althea_types/src/exits/encryption.rs b/althea_types/src/exits/encryption.rs index 7d4c35d03..a777cc5bd 100644 --- a/althea_types/src/exits/encryption.rs +++ b/althea_types/src/exits/encryption.rs @@ -158,6 +158,7 @@ mod tests { use crate::exits::identity::random_exit_identity; use crate::exits::ExitRegistrationDetails; use crate::ExitClientIdentity; + use clarity::Address; use crypto_box::PublicKey; use crypto_box::SecretKey; use sodiumoxide::crypto::box_; @@ -171,6 +172,10 @@ mod tests { (public_key, secret_key) } + pub fn random_address() -> Address { + Address::from_slice(&[1u8; 20]).unwrap() + } + /// Used to test cross compatibility with libsodium pub fn encrypt_exit_client_id_libsodium( our_publickey: WgKey, @@ -247,7 +252,7 @@ mod tests { email_code: None, phone: None, phone_code: None, - sequence_number: None, + exit_database_contract: random_address(), }, }; @@ -292,7 +297,7 @@ mod tests { email_code: None, phone: None, phone_code: None, - sequence_number: None, + exit_database_contract: random_address(), }, }; diff --git a/althea_types/src/exits/mod.rs b/althea_types/src/exits/mod.rs index fe41b5ec0..84f124e44 100644 --- a/althea_types/src/exits/mod.rs +++ b/althea_types/src/exits/mod.rs @@ -1,6 +1,7 @@ use crate::default_system_chain; use crate::wg_key::WgKey; use crate::{exits::identity::ExitIdentity, Identity, SystemChain}; +use clarity::Address; use ipnetwork::IpNetwork; use serde::Deserialize; use serde::Serialize; @@ -11,6 +12,7 @@ pub mod encryption; pub mod identity; pub mod server_list_signatures; +/// Struct for registration communication between the client and the exit #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Default)] pub struct ExitRegistrationDetails { #[serde(skip_serializing_if = "Option::is_none", default)] @@ -21,8 +23,8 @@ pub struct ExitRegistrationDetails { pub phone: Option, #[serde(skip_serializing_if = "Option::is_none", default)] pub phone_code: Option, - #[serde(skip_serializing_if = "Option::is_none", default)] - pub sequence_number: Option, + /// This is the exit database contract that the client wishes to register with + pub exit_database_contract: Address, } /// This is the state an exit can be in diff --git a/exit_trust_root/Cargo.toml b/exit_trust_root/Cargo.toml index fe0e6ca01..97f20e3ad 100644 --- a/exit_trust_root/Cargo.toml +++ b/exit_trust_root/Cargo.toml @@ -14,6 +14,7 @@ name = "exit_trust_root_server" path = "src/bin.rs" [dependencies] +clap = {version="4", features=["derive"]} althea_types = { path = "../althea_types" } awc = {workspace = true} actix-web = {workspace = true} @@ -23,8 +24,6 @@ env_logger = "0.11" log = "0.4" clarity = "1.4" web30 = "1.4" -crypto_box = "0.9" -lazy_static = "1.5" phonenumber = "0.3.6" actix = "0.13" tokio = { version = "1.40", features = ["macros", "time"] } @@ -32,6 +31,11 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" toml = "0.5" +crossbeam = "0.8" +futures = "0.1" + +[dev-dependencies] +actix-rt = {workspace=true} [features] development = [] \ No newline at end of file diff --git a/exit_trust_root/src/bin.rs b/exit_trust_root/src/bin.rs index 10fa06e36..d786e1b3a 100644 --- a/exit_trust_root/src/bin.rs +++ b/exit_trust_root/src/bin.rs @@ -1,5 +1,6 @@ +use clap::Parser; use env_logger::Env; -use exit_trust_root::{config::load_config, start_exit_trust_root_server}; +use exit_trust_root::{config::Config, start_exit_trust_root_server}; #[actix_web::main] async fn main() { @@ -8,9 +9,7 @@ async fn main() { openssl_probe::init_ssl_cert_env_vars(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - // ensure that the config file is valid, we discard the result and use - // lazy static variable after this - load_config(); + let args = Config::parse(); - start_exit_trust_root_server(); + start_exit_trust_root_server(args).await; } diff --git a/exit_trust_root/src/client_db.rs b/exit_trust_root/src/client_db.rs index 00b54e484..438aaecf0 100644 --- a/exit_trust_root/src/client_db.rs +++ b/exit_trust_root/src/client_db.rs @@ -3,6 +3,7 @@ //! exit and client routers can read it to coordinate user setup and two way key exchange with the blockchain //! as the trusted party +use crate::sms_auth::convert_althea_types_to_web3_error; use althea_types::{ExitIdentity, Identity, WgKey}; use clarity::{ abi::{encode_call, AbiToken}, @@ -16,8 +17,6 @@ use web30::{ types::{SendTxOption, TransactionRequest}, }; -use crate::rita_client_registration::convert_althea_types_to_web3_error; - /// The EVM integer size pub const WORD_SIZE: usize = 32; diff --git a/exit_trust_root/src/config.rs b/exit_trust_root/src/config.rs index 736c37f1a..cdf3d80e9 100644 --- a/exit_trust_root/src/config.rs +++ b/exit_trust_root/src/config.rs @@ -1,62 +1,130 @@ -use althea_types::WgKey; +use althea_types::SignedExitServerList; +use clarity::Address; use clarity::PrivateKey; -use lazy_static::lazy_static; -use log::error; -use serde::{Deserialize, Serialize}; -use std::{fs::File, io::Read}; - -use crate::DEVELOPMENT; - -///Struct containing settings for Exit root server -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ConfigStruct { - pub clarity_private_key: PrivateKey, - pub wg_private_key: WgKey, +use crossbeam::queue::SegQueue; +use phonenumber::PhoneNumber; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use std::time::Instant; + +use crate::register_client_batch_loop::RegistrationRequest; +use crate::RPC_SERVER; + +/// Command line arguments +#[derive(clap::Parser, Clone, Debug)] +#[clap(version = env!("CARGO_PKG_VERSION"), author = "Justin Kilpatrick ")] +#[command(version, about, long_about = None)] +pub struct Config { + /// How long to wait for a response from a full node before timing out + /// in seconds. Set this conservatively to avoid crashing an operation + /// that has already been running for a long time. + #[arg(short, long, default_value = "30")] + pub timeout: u64, + /// rpc url to use, this should be an ETH node on the same network as the + /// database smart contract + #[arg(short, long, default_value = RPC_SERVER)] + pub rpc: String, + /// An ethereum private key, used to both sign transactions (root of trust) + /// and to make requests to the database smart contract, such as registration + #[arg(short, long)] + pub private_key: PrivateKey, + /// SMS API key + #[arg(short, long)] + pub telnyx_api_key: String, + /// SMS verify profile id + #[arg(short, long)] + pub verify_profile_id: String, + /// The Magic number if provided bypasses authentication + /// and registers the user with the given identity + #[arg(short, long)] + pub magic_number: Option, + /// The Magic number if provided bypasses authentication + /// and registers the user with the given identity + /// If set to true use https mode + #[arg(short, long)] + pub https: bool, + /// URL to listen on + #[arg(short, long)] + pub url: String, } -impl ConfigStruct { - pub fn load(path: String) -> Option { - let mut config_toml = String::new(); +#[derive(Clone, Debug)] +/// Utility struct to hold the config and cache in a single argument +pub struct ConfigAndCache { + /// The configuration for the server + pub config: Arc, + /// A cache of exit server lists, we try to serve these first before querying the blockchain + pub cache: Arc>>, + /// A lock free shared queue of registration requests, these are processed by the registration loop + pub registration_queue: Arc>, + /// Data about the number of texts sent to each phone number, each instant represents the last time + /// a text was sent to that number + pub texts_sent: Arc>>>, +} - let mut file = match File::open(path) { - Ok(file) => file, - Err(_) => { - error!("Could not find config file. Using default!"); - return None; - } - }; +impl ConfigAndCache { + pub fn insert(&self, key: Address, value: SignedExitServerList) { + self.cache.write().unwrap().insert(key, value); + } - file.read_to_string(&mut config_toml) - .unwrap_or_else(|err| panic!("Error while reading config: [{}]", err)); + pub fn get_all(&self) -> HashMap { + self.cache.read().unwrap().clone() + } - let res = toml::from_str(&config_toml).unwrap(); - Some(res) + pub fn get(&self, key: &Address) -> Option { + self.cache.read().unwrap().get(key).cloned() } -} -/// loads the exit root server config, broken out here so that -/// we can easily verify that the config is valid before starting -pub fn load_config() -> ConfigStruct { - // change the config name based on our development status - let file_name = if DEVELOPMENT || cfg!(test) { - return ConfigStruct { - clarity_private_key: PrivateKey::from_bytes([1u8; 32]).unwrap(), - wg_private_key: WgKey::from([2; 32]), - }; - } else { - "/etc/exit_root_server.toml" - }; - let config_structs = ConfigStruct::load(file_name.to_string()); - if let Some(conf) = config_structs { - conf - } else { - panic!( - "Can not find configuration file! for filename {:?}", - file_name - ); + pub fn get_config(&self) -> Arc { + self.config.clone() } -} -lazy_static! { - pub static ref CONFIG: ConfigStruct = load_config(); + pub fn insert_client_to_reg_queue(&self, request: RegistrationRequest) { + self.registration_queue.push(request); + } + + /// Limits for how many texts can be sent to a given phone number in a given time period + /// expnentially decreasing over time to prevent spam + const TEXT_LIMITS: [(Duration, u8); 3] = [ + (Duration::from_secs(60), 1), + (Duration::from_secs(3600), 10), + (Duration::from_secs(86400), 20), + ]; + /// How long to keep a text in the history before dropping it + const DROP_TEXT_HISTORY: Duration = Duration::from_secs(864000); + pub fn has_hit_text_limit(&self, key: &PhoneNumber) -> bool { + let now = Instant::now(); + for (time, limit) in Self::TEXT_LIMITS.iter() { + let texts = self.get_texts_sent(key); + let mut count = 0; + for text in texts.iter() { + if now.duration_since(*text) < *time { + count += 1; + } + } + if count >= *limit { + return true; + } + } + false + } + + pub fn get_texts_sent(&self, key: &PhoneNumber) -> Vec { + self.texts_sent + .read() + .unwrap() + .get(key) + .cloned() + .unwrap_or_default() + } + + pub fn insert_text_sent(&self, key: PhoneNumber) { + // clean up old texts while inserting a new one + let mut texts = self.texts_sent.write().unwrap(); + for (_, texts) in texts.iter_mut() { + texts.retain(|text| Instant::now().duration_since(*text) < Self::DROP_TEXT_HISTORY); + } + texts.entry(key).or_default().push(Instant::now()); + } } diff --git a/exit_trust_root/src/endpoints.rs b/exit_trust_root/src/endpoints.rs new file mode 100644 index 000000000..314c02750 --- /dev/null +++ b/exit_trust_root/src/endpoints.rs @@ -0,0 +1,210 @@ +use std::time::Duration; + +use crate::client_db::check_user_admin; +use crate::register_client_batch_loop::RegistrationRequest; +use crate::retrieve_exit_server_list; +use crate::sms_auth::check_sms_auth_result; +use crate::sms_auth::start_sms_auth_flow; +use crate::sms_auth::TextApiError; +use crate::ConfigAndCache; +use actix_web::post; +use actix_web::{get, web, HttpResponse, Responder}; +use althea_types::Identity; +use clarity::Address; +use log::error; +use log::info; +use phonenumber::PhoneNumber; +use serde::Deserialize; +use serde::Serialize; +use web30::client::Web3; + +/// This endpoint retrieves and signs the data from any specified exit contract, +/// allowing this server to serve as a root of trust for several different exit contracts. +#[get("/{exit_contract}")] +pub async fn return_signed_exit_contract_data( + exit_contract: web::Path
, + cache: web::Data, +) -> impl Responder { + let contract: Address = exit_contract.into_inner(); + let cached_list = cache.get(&contract); + + match cached_list { + Some(list) => { + // return a signed exit server list based on the given key + HttpResponse::Ok().json(list) + } + None => match retrieve_exit_server_list(contract, cache.get_ref().clone()).await { + Ok(list) => HttpResponse::Ok().json(list), + Err(e) => { + info!("Failed to get exit list from contract {:?}", e); + HttpResponse::InternalServerError().json("Failed to get exit list from contract") + } + }, + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, Hash, PartialEq, Eq)] +/// Registration request stuct posted by the client to start the registration process +pub struct RegisterRequest { + pub phone_number: PhoneNumber, +} + +/// This function starts client registration +/// It will send a text to the client with a code that they must submit +/// to the submit_code endpoint to complete registration +#[post("/register")] +pub async fn start_client_registration( + client: web::Json, + cache: web::Data, +) -> impl Responder { + let config = cache.get_config(); + info!("Starting phone registration for {}", client.phone_number); + + if cache.has_hit_text_limit(&client.phone_number) { + error!("Registration text limit hit for {}", client.phone_number); + return HttpResponse::TooManyRequests().finish(); + } + + // the magic number in this case doesn't really do anything, they can + // just call the verification endpoint directly and 'complete' the verification + if let Some(number) = config.magic_number.clone() { + if number == client.phone_number { + info!("Magic number detected",); + return HttpResponse::Ok().finish(); + } + } + + // below this point we send a text, so we should increment the counter + // first, just in case there's some error in the text sending process + // where we might send a text but not increment the counter + cache.insert_text_sent(client.phone_number.clone()); + let res = start_sms_auth_flow( + client.phone_number.clone(), + config.telnyx_api_key.clone(), + config.verify_profile_id.clone(), + ) + .await; + match res { + Ok(_) => HttpResponse::Ok().finish(), + Err(e) => match e { + TextApiError::InternalServerError { error } => { + error!("Internal server error {:?}", error); + HttpResponse::InternalServerError().finish() + } + TextApiError::SendRequestError { error } => { + error!("Failed to send text {:?}", error); + HttpResponse::InternalServerError().finish() + } + }, + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, Hash, PartialEq, Eq)] +/// Registration request stuct posted by the client to start the registration process +pub struct SubmitCodeRequest { + /// The phone number, must have first been used to start the registration process + /// with the register endpoint + pub phone_number: PhoneNumber, + /// The identity of the client router to be registered + pub identity: Identity, + /// The code sent to the phone number + pub code: String, + /// The contract address to register with + pub contract: Address, +} + +/// This endpoint is used by clients to submit their registration code +/// once successfully verified they will be added to the registration queue +/// and will be registered with the registration contract of their choice +#[post("/submit_code")] +pub async fn submit_registration_code( + request: web::Json, + cache: web::Data, +) -> impl Responder { + let config = cache.get_config(); + info!( + "Submitting code for {} with identity {:?}", + request.phone_number, request.identity + ); + + let contract_addr = request.contract; + let web3 = Web3::new(&cache.get_config().rpc, Duration::from_secs(2)); + let our_private_key = cache.get_config().private_key; + // ensure that it is possible to register with the contract, having this here + // is less efficient, but really helps with debugging by not giving a confusing + // 'ok' response to the client when we can't actually register them + match check_user_admin( + &web3, + contract_addr, + our_private_key.to_address(), + our_private_key, + ) + .await + { + Ok(b) => { + if !b { + error!( + "We are not a user admin for contract {:?}, responding with 403", + contract_addr + ); + return HttpResponse::Forbidden().finish(); + } + } + Err(e) => { + error!("Failed to check if we are a user admin {:?}", e); + return HttpResponse::InternalServerError().finish(); + } + } + + // the magic number in this case doesn't really do anything, they can + // just call the verification endpoint directly and 'complete' the verification + if let Some(number) = config.magic_number.clone() { + if number == request.phone_number { + info!( + "Magic number detected, submitting user {} for registration", + request.identity.wg_public_key + ); + cache.insert_client_to_reg_queue(RegistrationRequest { + identity: request.identity, + contract: request.contract, + }); + return HttpResponse::Ok().finish(); + } + } + + let res = check_sms_auth_result( + request.phone_number.clone(), + request.code.clone(), + config.telnyx_api_key.clone(), + config.verify_profile_id.clone(), + ) + .await; + + match res { + Ok(true) => { + info!( + "Code verified for {} submitting {} for registration", + request.phone_number, request.identity.wg_public_key + ); + cache.insert_client_to_reg_queue(RegistrationRequest { + identity: request.identity, + contract: request.contract, + }); + HttpResponse::Ok().finish() + } + Ok(false) => { + info!("Code not verified for {}", request.phone_number); + HttpResponse::BadRequest().finish() + } + Err(e) => match e { + TextApiError::InternalServerError { error } => { + error!("Internal server error {:?}", error); + HttpResponse::InternalServerError().finish() + } + TextApiError::SendRequestError { error } => { + error!("Failed to send text {:?}", error); + HttpResponse::InternalServerError().finish() + } + }, + } +} diff --git a/exit_trust_root/src/lib.rs b/exit_trust_root/src/lib.rs index 5aa052844..75847d65a 100644 --- a/exit_trust_root/src/lib.rs +++ b/exit_trust_root/src/lib.rs @@ -1,83 +1,57 @@ -use actix_web::rt::System; -use actix_web::{get, web, App, HttpResponse, HttpServer, Responder}; +use actix_web::{web, App, HttpServer}; use althea_types::{ExitServerList, SignedExitServerList}; use clarity::Address; use client_db::get_exits_list; -use config::CONFIG; +use config::Config; +use config::ConfigAndCache; +use crossbeam::queue::SegQueue; +use endpoints::return_signed_exit_contract_data; +use endpoints::start_client_registration; +use endpoints::submit_registration_code; use log::info; use openssl::ssl::{SslAcceptor, SslMethod}; +use register_client_batch_loop::register_client_batch_loop; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, RwLock}; -use std::thread; use std::time::Duration; +use tokio::join; use web30::client::Web3; use web30::jsonrpc::error::Web3Error; pub mod client_db; pub mod config; +pub mod endpoints; pub mod register_client_batch_loop; -pub mod rita_client_registration; +pub mod sms_auth; const RPC_SERVER: &str = "https://dai.althea.net"; const WEB3_TIMEOUT: Duration = Duration::from_secs(10); -// five minutes -const SIGNATURE_UPDATE_SLEEP: Duration = Duration::from_secs(300); - -pub const DEVELOPMENT: bool = cfg!(feature = "development"); -const SSL: bool = !DEVELOPMENT; -pub const EXIT_ROOT_DOMAIN: &str = if cfg!(test) || cfg!(feature = "development") { - "http://10.0.0.1:4050" +// Faster update time in development mode +const SIGNATURE_UPDATE_SLEEP: Duration = if DEVELOPMENT || cfg!(test) { + Duration::from_secs(10) } else { - "https://exitroot.althea.net" + Duration::from_secs(300) }; + +pub const DEVELOPMENT: bool = cfg!(feature = "development"); /// The backend RPC port for the info server fucntions implemented in this repo pub const SERVER_PORT: u16 = 4050; -/// This endpoint retrieves and signs the data from any specified exit contract, -/// allowing this server to serve as a root of trust for several different exit contracts. -#[get("/{exit_contract}")] -pub async fn return_exit_contract_data( - exit_contract: web::Path
, - cache: web::Data>>>, -) -> impl Responder { - let contract: Address = exit_contract.into_inner(); - let cached_list = { - let cache_read = cache.read().unwrap(); - cache_read.get(&contract).cloned() - }; - - match cached_list { - Some(list) => { - // return a signed exit server list based on the given key - HttpResponse::Ok().json(list) - } - None => match retrieve_exit_server_list(contract, cache.get_ref().clone()).await { - Ok(list) => HttpResponse::Ok().json(list), - Err(e) => { - info!("Failed to get exit list from contract {:?}", e); - HttpResponse::InternalServerError().json("Failed to get exit list from contract") - } - }, - } -} - async fn retrieve_exit_server_list( exit_contract: Address, - cache: Arc>>, + cache: ConfigAndCache, ) -> Result { + let config = cache.get_config(); let exits = match DEVELOPMENT || cfg!(test) { true => { let node_ip = IpAddr::V4(Ipv4Addr::new(7, 7, 7, 1)); let web3_url = format!("http://{}:8545", node_ip); - info!( - "Our address is {:?}", - CONFIG.clarity_private_key.to_address() - ); + info!("Our address is {:?}", config.private_key.to_address()); get_exits_list( &Web3::new(&web3_url, WEB3_TIMEOUT), - CONFIG.clarity_private_key.to_address(), + config.private_key.to_address(), exit_contract, ) .await @@ -85,7 +59,7 @@ async fn retrieve_exit_server_list( false => { get_exits_list( &Web3::new(RPC_SERVER, WEB3_TIMEOUT), - CONFIG.clarity_private_key.to_address(), + config.private_key.to_address(), exit_contract, ) .await @@ -101,15 +75,12 @@ async fn retrieve_exit_server_list( }; info!( "Signing exit list with PUBKEY: {:?}", - CONFIG.clarity_private_key.to_address() + config.private_key.to_address() ); - let cache_value = exit_list.sign(CONFIG.clarity_private_key); + let cache_value = exit_list.sign(config.private_key); // add this new exit list to the cache - cache - .write() - .unwrap() - .insert(exit_contract, cache_value.clone()); + cache.insert(exit_contract, cache_value.clone()); Ok(cache_value) } Err(e) => { @@ -119,80 +90,79 @@ async fn retrieve_exit_server_list( } } -pub fn start_exit_trust_root_server() { - let exit_contract_data_cache: Arc>> = - Arc::new(RwLock::new(HashMap::new())); - signature_update_loop(exit_contract_data_cache.clone()); +pub async fn start_exit_trust_root_server(config: Config) { + let domain = config.url.clone(); + let https = config.https; + let exit_contract_data_cache: ConfigAndCache = ConfigAndCache { + config: Arc::new(config.clone()), + cache: Arc::new(RwLock::new(HashMap::new())), + registration_queue: Arc::new(SegQueue::new()), + texts_sent: Arc::new(RwLock::new(HashMap::new())), + }; + let sig_loop = signature_update_loop(exit_contract_data_cache.clone()); + let reg_loop = register_client_batch_loop( + config.rpc, + config.private_key, + exit_contract_data_cache.registration_queue.clone(), + ); let web_data = web::Data::new(exit_contract_data_cache.clone()); - thread::spawn(move || { - let runner = System::new(); - runner.block_on(async move { - let server = HttpServer::new(move || { - App::new() - .service(return_exit_contract_data) - .app_data(web_data.clone()) - }); - info!("Starting exit trust root server on {:?}", EXIT_ROOT_DOMAIN); - let server = if SSL { - // build TLS config from files - let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); - // set the certificate chain file location - builder - .set_certificate_chain_file(format!( - "/etc/letsencrypt/live/{}/fullchain.pem", - EXIT_ROOT_DOMAIN - )) - .unwrap(); - builder - .set_private_key_file( - format!("/etc/letsencrypt/live/{}/privkey.pem", EXIT_ROOT_DOMAIN), - openssl::ssl::SslFiletype::PEM, - ) - .unwrap(); + let server = HttpServer::new(move || { + App::new() + .service(return_signed_exit_contract_data) + .service(start_client_registration) + .service(submit_registration_code) + .app_data(web_data.clone()) + }); + info!("Starting exit trust root server on {:?}", domain); - info!("Binding to SSL"); - server - .bind_openssl(format!("{}:{}", EXIT_ROOT_DOMAIN, SERVER_PORT), builder) - .unwrap() - } else { - info!("Binding to {}:{}", EXIT_ROOT_DOMAIN, SERVER_PORT); - server - .bind(format!("{}:{}", EXIT_ROOT_DOMAIN, SERVER_PORT)) - .unwrap() - }; + let server = if https { + // build TLS config from files + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + // set the certificate chain file location + builder + .set_certificate_chain_file(format!("/etc/letsencrypt/live/{}/fullchain.pem", domain)) + .unwrap(); + builder + .set_private_key_file( + format!("/etc/letsencrypt/live/{}/privkey.pem", domain), + openssl::ssl::SslFiletype::PEM, + ) + .unwrap(); - let _ = server.run().await; - }); - }); + info!("Binding to SSL"); + server + .bind_openssl(format!("{}:{}", domain, SERVER_PORT), builder) + .unwrap() + } else { + info!("Binding to {}:{}", domain, SERVER_PORT); + server.bind(format!("{}:{}", domain, SERVER_PORT)).unwrap() + }; + + // run all three of these loops in the async executor that called this function + // this function will not return and block indefinitely + let _ = join!(server.run(), sig_loop, reg_loop); } /// In order to improve scalability this loop grabs and signs an updated list of exits from each exit contract /// that has previously been requested from this server every 5 minutes. This allows the server to return instantly /// on the next request from the client without having to perform rpc query 1-1 with requests. -pub fn signature_update_loop(cache: Arc>>) { - thread::spawn(move || loop { - let runner = System::new(); - runner.block_on(async { - let cache_iter = cache.read().unwrap().clone(); - for (exit_contract, _value) in cache_iter.iter() { - // get the latest exit list from the contract - match retrieve_exit_server_list(*exit_contract, cache.clone()).await { - // grab the cache here so we don't lock it while awaiting for every single contract - Ok(cache_value) => { - // update the cache - cache.write().unwrap().insert(*exit_contract, cache_value); - } - Err(e) => { - info!("Failed to get exit list from contract {:?}", e); - } +pub async fn signature_update_loop(cache: ConfigAndCache) { + loop { + let cache_iter = cache.get_all(); + for exit_contract in cache_iter.keys() { + // get the latest exit list from the contract + match retrieve_exit_server_list(*exit_contract, cache.clone()).await { + // grab the cache here so we don't lock it while awaiting for every single contract + Ok(cache_value) => { + // update the cache + cache.insert(*exit_contract, cache_value); + } + Err(e) => { + info!("Failed to get exit list from contract {:?}", e); } } - }); - if DEVELOPMENT || cfg!(test) { - thread::sleep(Duration::from_secs(10)); - } else { - thread::sleep(SIGNATURE_UPDATE_SLEEP); } - }); + tokio::time::sleep(SIGNATURE_UPDATE_SLEEP).await; + } } diff --git a/exit_trust_root/src/register_client_batch_loop.rs b/exit_trust_root/src/register_client_batch_loop.rs index da8f56ecb..ba3639e64 100644 --- a/exit_trust_root/src/register_client_batch_loop.rs +++ b/exit_trust_root/src/register_client_batch_loop.rs @@ -1,21 +1,23 @@ -use actix::System; +use crate::{ + client_db::{add_users_to_registered_list, check_user_admin, get_all_registered_clients}, + sms_auth::{REGISTRATION_LOOP_SPEED, TX_TIMEOUT, WEB3_TIMEOUT}, +}; use althea_types::Identity; use clarity::{Address, PrivateKey}; +use crossbeam::queue::SegQueue; use log::{error, info}; use std::{ - collections::HashSet, - thread, - time::{Duration, Instant}, + collections::{HashMap, HashSet}, + sync::Arc, + time::Instant, }; use web30::{client::Web3, types::SendTxOption}; -use crate::{ - client_db::{add_users_to_registered_list, get_all_registered_clients}, - rita_client_registration::{ - get_reg_queue, remove_client_from_reg_queue, REGISTRATION_LOOP_SPEED, TX_TIMEOUT, - WEB3_TIMEOUT, - }, -}; +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct RegistrationRequest { + pub identity: Identity, + pub contract: Address, +} pub const MAX_BATCH_SIZE: usize = 75; @@ -28,109 +30,139 @@ pub fn get_clients_hashset(input: Vec) -> HashSet { output } -/// This function starts a separate thread that monitors the registraiton batch lazy static variable and every REGISTRATION_LOOP_SPEED seconds -/// sends a batch register tx to the smart contract -pub fn register_client_batch_loop( - web3_url: String, - contract_addr: Address, +/// This function monitors the registration queue lock free queue. It will dequeue any new entries and attempt to register them +/// in a batch sent every REGISTRATION_LOOP_SPEED seconds. This function will also check if the user is already registered before attempting to register them +pub async fn register_client_batch_loop( + rpc_url: String, our_private_key: PrivateKey, + registration_queue: Arc>, ) { - let mut last_restart = Instant::now(); - thread::spawn(move || { - // this will always be an error, so it's really just a loop statement - // with some fancy destructuring - while let Err(e) = { - let web3_url = web3_url.clone(); - thread::spawn(move || { - let web3_url = web3_url.clone(); - // Our Exit state variabl - let runner = System::new(); - - runner.block_on(async move { - loop { - let start = Instant::now(); - // there is no one in the queue - let list = get_reg_queue(); - if list.is_empty() { - thread::sleep(WEB3_TIMEOUT); - continue - } + // local copy of the registration queue, entries are copied off of the + // registration queue and into this local queue before being processed + // each key in the hashmap represents a list of users waiting to be registered + // for a given database contract + let mut local_queue: HashMap> = HashMap::new(); + loop { + let start = Instant::now(); + // copy all entries from the registration queue into the local queue + // creating new entries for each contract address as needed + while let Some(reg_request) = registration_queue.pop() { + let contract_addr = reg_request.contract; + match local_queue.get_mut(&contract_addr) { + Some(queue) => { + queue.insert(reg_request); + } + None => { + let mut new_queue = HashSet::new(); + new_queue.insert(reg_request); + local_queue.insert(contract_addr, new_queue); + } + } + } + let web3 = Web3::new(&rpc_url, WEB3_TIMEOUT); - let web3 = Web3::new(&web3_url, WEB3_TIMEOUT); - // get a copy of all existing clients, we do this in order to handle a potential future edgecase where more than one registration server - // is operating at a time and the same user attempts to register to more than one before the transaction can be sent. Without this check - // once a already registered user is in the queue all future transactions would fail and the server would no longer operate correctly - let all_clients = match get_all_registered_clients(&web3, our_private_key.to_address(), contract_addr).await { - Ok(all_clients) => all_clients, - Err(e) => { - error!("Failed to get list of already registered clients {:?}, retrying", e); - continue; - }, - }; - let all_clients = get_clients_hashset(all_clients); + for (contract_addr, list) in local_queue.iter_mut() { + if list.is_empty() { + continue; + } - let mut clients_to_register = Vec::new(); - for client in list { - if !all_clients.contains(&client) { - clients_to_register.push(client); - if clients_to_register.len() > MAX_BATCH_SIZE { - break; - } - } - } - // there is no one once we filter already registered users - if clients_to_register.is_empty() { - thread::sleep(WEB3_TIMEOUT); - continue - } + match check_user_admin( + &web3, + *contract_addr, + our_private_key.to_address(), + our_private_key, + ) + .await + { + Ok(b) => { + if !b { + error!( + "We are not a user admin for contract {:?}, skipping registration", + contract_addr + ); + continue; + } + } + Err(e) => { + error!("Failed to check if we are a user admin {:?}, retrying", e); + continue; + } + } - info!("Prepped user batch sending register tx"); - match add_users_to_registered_list( - &web3, - clients_to_register.clone(), - contract_addr, - our_private_key, - Some(TX_TIMEOUT), - vec![SendTxOption::GasPriorityFee(1000000000u128.into()), SendTxOption::GasMaxFee(4000000000u128.into())], - ) - .await - { - Ok(_) => { - info!( - "Successfully registered {} clients!", - clients_to_register.len() - ); - // remove all the successfully registered clients from the queue - for client in clients_to_register { - remove_client_from_reg_queue(client); - } - } - Err(e) => { - error!("Failed to register clients with {:?}, will try again!", e) - } - } + // list of existing users to check against, prevent duplicate registrations + let all_clients = match get_all_registered_clients( + &web3, + our_private_key.to_address(), + *contract_addr, + ) + .await + { + Ok(all_clients) => all_clients, + Err(e) => { + error!( + "Failed to get list of already registered clients {:?}, retrying", + e + ); + continue; + } + }; + let all_clients = get_clients_hashset(all_clients); - info!("Registration loop elapsed in = {:?}", start.elapsed()); - if start.elapsed() < REGISTRATION_LOOP_SPEED { - info!( - "Registration Loop sleeping for {:?}", - REGISTRATION_LOOP_SPEED - start.elapsed() - ); - thread::sleep(REGISTRATION_LOOP_SPEED - start.elapsed()); - } - info!("Registration loop sleeping Done!"); + let mut clients_to_register = Vec::new(); + for client in list.iter() { + if !all_clients.contains(&client.identity) { + clients_to_register.push(client); + if clients_to_register.len() > MAX_BATCH_SIZE { + break; } - }); - }) - .join() - } { - error!("Registration loop thread panicked! Respawning {:?}", e); - if Instant::now() - last_restart < Duration::from_secs(60) { - error!("Restarting too quickly, leaving it to auto rescue!"); - let sys = System::current(); - sys.stop_with_code(121); + } + } + // there is no one to register once we filter already registered users + if clients_to_register.is_empty() { + continue; + } + + info!( + "Prepped user batch sending register tx against contract {:?} from our address {} with balance {} and clients {:#?}", + contract_addr, + our_private_key.to_address(), + web3.eth_get_balance(our_private_key.to_address()).await.unwrap(), + clients_to_register + ); + match add_users_to_registered_list( + &web3, + clients_to_register.iter().map(|x| x.identity).collect(), + *contract_addr, + our_private_key, + Some(TX_TIMEOUT), + vec![ + SendTxOption::GasPriorityFee(1000000000u128.into()), + SendTxOption::GasMaxFee(4000000000u128.into()), + ], + ) + .await + { + Ok(_) => { + info!( + "Successfully registered {} clients!", + clients_to_register.len() + ); + // remove all the successfully registered clients from the queue + list.clear(); + } + Err(e) => { + error!("Failed to register clients with {:?}, will try again!", e) + } } - last_restart = Instant::now(); } - }); + + info!("Registration loop elapsed in = {:?}", start.elapsed()); + if start.elapsed() < REGISTRATION_LOOP_SPEED { + info!( + "Registration Loop sleeping for {:?}", + REGISTRATION_LOOP_SPEED - start.elapsed() + ); + tokio::time::sleep(REGISTRATION_LOOP_SPEED - start.elapsed()).await; + } + } } diff --git a/exit_trust_root/src/rita_client_registration.rs b/exit_trust_root/src/rita_client_registration.rs deleted file mode 100644 index c182a102c..000000000 --- a/exit_trust_root/src/rita_client_registration.rs +++ /dev/null @@ -1,339 +0,0 @@ -use althea_types::{error::AltheaTypesError, ExitClientIdentity, Identity, WgKey}; -use awc::error::JsonPayloadError; -use awc::error::SendRequestError; -use lazy_static::lazy_static; -use log::error; -use log::info; -use log::trace; -use phonenumber::PhoneNumber; -use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - error::Error, - fmt::Display, - sync::{Arc, RwLock}, - time::Duration, -}; -use web30::jsonrpc::error::Web3Error; - -lazy_static! { - /// A map that stores number of texts sent to a client during registration - static ref TEXTS_SENT: Arc>> = Arc::new(RwLock::new(HashMap::new())); - static ref REGISTER_QUEUE: Arc>> = Arc::new(RwLock::new(HashSet::new())); -} - -pub const REGISTRATION_LOOP_SPEED: Duration = Duration::from_secs(10); -pub const WEB3_TIMEOUT: Duration = Duration::from_secs(15); -pub const TX_TIMEOUT: Duration = Duration::from_secs(60); - -/// Return struct from check_text and Send Text. Verified indicates status from api http req, -/// bad phone number is an error parsing clients phone number -/// Internal server error is an error while querying api endpoint -#[derive(Debug)] -pub enum TextApiError { - BadPhoneNumber, - InternalServerError { error: String }, - SendRequestError { error: SendRequestError }, -} - -impl Display for TextApiError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TextApiError::BadPhoneNumber => write!(f, "InvalidPhoneNumber"), - TextApiError::InternalServerError { error } => write!(f, "Internal error {}", error), - TextApiError::SendRequestError { error } => write!(f, "{}", error), - } - } -} - -impl Error for TextApiError {} - -impl From for TextApiError { - fn from(value: JsonPayloadError) -> Self { - TextApiError::InternalServerError { - error: value.to_string(), - } - } -} - -impl From for TextApiError { - fn from(value: SendRequestError) -> Self { - TextApiError::SendRequestError { error: value } - } -} - -/// Return struct from Registration server to exit -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ExitSignupReturn { - RegistrationOk, - PendingRegistration, - BadPhoneNumber, - InternalServerError { e: String }, -} -// Lazy static setters and getters -fn increment_texts_sent(key: WgKey) { - let lock = &mut *TEXTS_SENT.write().unwrap(); - let txt_sent = lock.get_mut(&key); - if let Some(val) = txt_sent { - *val += 1; - } else { - lock.insert(key, 1); - } -} - -fn reset_texts_sent(key: WgKey) { - TEXTS_SENT.write().unwrap().remove(&key); -} - -fn get_texts_sent(key: WgKey) -> u8 { - *TEXTS_SENT.read().unwrap().get(&key).unwrap_or(&0u8) -} - -pub fn add_client_to_reg_queue(id: Identity) { - REGISTER_QUEUE.write().unwrap().insert(id); -} - -pub fn remove_client_from_reg_queue(id: Identity) { - REGISTER_QUEUE.write().unwrap().remove(&id); -} - -pub fn get_reg_queue() -> Vec { - REGISTER_QUEUE.read().unwrap().clone().into_iter().collect() -} - -#[derive(Serialize)] -pub struct SmsCheck { - api_key: String, - verification_code: String, - phone_number: String, - country_code: String, -} - -#[derive(Serialize)] -pub struct SmsRequest { - api_key: String, - via: String, - phone_number: String, - country_code: String, -} - -/// Handles the minutia of phone registration states -pub async fn handle_sms_registration( - client: ExitClientIdentity, - api_key: String, - verify_profile_id: String, - magic_number: Option, -) -> ExitSignupReturn { - info!( - "Handling phone registration for {}", - client.global.wg_public_key - ); - - // Get magic phone number - let magic_phone_number = magic_number; - - let text_num = get_texts_sent(client.global.wg_public_key); - let sent_more_than_allowed_texts = text_num > 10; - - match client.reg_details.phone { - Some(number) => match number.parse() { - Ok(number) => { - let number: PhoneNumber = number; - match ( - client.reg_details.phone_code.clone(), - sent_more_than_allowed_texts, - ) { - // all texts exhausted, but they can still submit the correct code - (Some(code), true) => { - let is_magic = magic_phone_number.is_some() - && magic_phone_number.unwrap() == number.clone(); - let result = is_magic || { - match check_sms_auth_result( - number.clone(), - code, - api_key, - verify_profile_id, - ) - .await - { - Ok(a) => a, - Err(e) => return return_api_error(e), - } - }; - if result { - info!( - "Phone registration complete for {}", - client.global.wg_public_key - ); - - add_client_to_reg_queue(client.global); - reset_texts_sent(client.global.wg_public_key); - ExitSignupReturn::RegistrationOk - } else { - ExitSignupReturn::PendingRegistration - } - } - // user has exhausted attempts but is still not submitting code - (None, true) => ExitSignupReturn::PendingRegistration, - // user has attempts remaining and is requesting the code be resent - (None, false) => { - if let Err(e) = - start_sms_auth_flow(number, api_key, verify_profile_id).await - { - return return_api_error(e); - } - increment_texts_sent(client.global.wg_public_key); - ExitSignupReturn::PendingRegistration - } - // user has attempts remaining and is submitting a code - (Some(code), false) => { - let is_magic = magic_phone_number.is_some() - && magic_phone_number.unwrap() == number.clone(); - - let result = is_magic || { - match check_sms_auth_result( - number.clone(), - code, - api_key, - verify_profile_id, - ) - .await - { - Ok(a) => a, - Err(e) => return return_api_error(e), - } - }; - trace!("Check text returned {}", result); - if result { - info!( - "Phone registration complete for {}", - client.global.wg_public_key - ); - add_client_to_reg_queue(client.global); - reset_texts_sent(client.global.wg_public_key); - ExitSignupReturn::RegistrationOk - } else { - ExitSignupReturn::PendingRegistration - } - } - } - } - Err(_) => ExitSignupReturn::BadPhoneNumber, - }, - None => ExitSignupReturn::BadPhoneNumber, - } -} - -fn return_api_error(e: TextApiError) -> ExitSignupReturn { - match e { - TextApiError::BadPhoneNumber => ExitSignupReturn::BadPhoneNumber, - TextApiError::InternalServerError { error } => { - ExitSignupReturn::InternalServerError { e: error } - } - TextApiError::SendRequestError { error } => ExitSignupReturn::InternalServerError { - e: error.to_string(), - }, - } -} - -#[derive(Serialize)] -pub struct TelnyxSmsAuthCheck { - verify_profile_id: String, - code: String, -} - -#[derive(Debug, Deserialize)] -pub struct TelnyxSmsAuthResponseBody { - pub data: TelnyxSmsAuthResponse, -} - -/// Response code is either accepted or rejected -#[derive(Debug, Deserialize)] -pub struct TelnyxSmsAuthResponse { - pub phone_number: String, - pub response_code: String, -} - -/// Posts to the validation endpoint with the code, will return success if the code -/// is the same as the one sent to the user -pub async fn check_sms_auth_result( - number: PhoneNumber, - code: String, - bearer_key: String, - verify_profile_id: String, -) -> Result { - info!("About to check text message status for {}", number); - - let check_url = format!( - "https://api.telnyx.com/v2/verifications/by_phone_number/{}/actions/verify", - number - ); - - let client = awc::Client::default(); - match client - .post(check_url) - .bearer_auth(bearer_key) - .send_json(&TelnyxSmsAuthCheck { - verify_profile_id, - code, - }) - .await - { - Ok(mut a) => { - let response = a.json::().await?; - if response.data.response_code == "accepted" { - Ok(true) - } else { - Ok(false) - } - } - Err(e) => { - error!("Failed to verify code with {:?}", e); - Err(e.into()) - } - } -} - -#[derive(Serialize)] -pub struct TelnyxAuthMessage { - /// user target number - pub phone_number: String, - pub verify_profile_id: String, -} - -/// Url for sending auth code -const URL_START: &str = "https://api.telnyx.com/v2/verifications/sms"; -pub async fn start_sms_auth_flow( - phone_number: PhoneNumber, - bearer_key: String, - verify_profile_id: String, -) -> Result<(), TextApiError> { - let client = awc::Client::default(); - match client - .post(URL_START) - .bearer_auth(bearer_key) - .timeout(Duration::from_secs(1)) - .send_json(&TelnyxAuthMessage { - phone_number: phone_number.to_string(), - verify_profile_id, - }) - .await - { - Ok(_) => Ok(()), - Err(e) => { - error!("auth text error {:?}", e); - Err(e.into()) - } - } -} - -/// Required because althea types doesn't import web30 and web30 doesn't import althea types making a from or -/// into conversion impossible -pub fn convert_althea_types_to_web3_error( - input: Result, -) -> Result { - match input { - Ok(a) => Ok(a), - Err(e) => Err(Web3Error::BadResponse(format!("{e}"))), - } -} diff --git a/exit_trust_root/src/sms_auth.rs b/exit_trust_root/src/sms_auth.rs new file mode 100644 index 000000000..3665ca5d4 --- /dev/null +++ b/exit_trust_root/src/sms_auth.rs @@ -0,0 +1,149 @@ +use althea_types::error::AltheaTypesError; +use awc::error::JsonPayloadError; +use awc::error::SendRequestError; +use log::error; +use log::info; +use phonenumber::PhoneNumber; +use serde::{Deserialize, Serialize}; +use std::{error::Error, fmt::Display, time::Duration}; +use web30::jsonrpc::error::Web3Error; + +pub const REGISTRATION_LOOP_SPEED: Duration = Duration::from_secs(10); +pub const WEB3_TIMEOUT: Duration = Duration::from_secs(15); +pub const TX_TIMEOUT: Duration = Duration::from_secs(60); + +/// Return struct from check_text and Send Text. Verified indicates status from api http req, +/// bad phone number is an error parsing clients phone number +/// Internal server error is an error while querying api endpoint +#[derive(Debug)] +pub enum TextApiError { + InternalServerError { error: String }, + SendRequestError { error: SendRequestError }, +} + +impl Display for TextApiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TextApiError::InternalServerError { error } => write!(f, "Internal error {}", error), + TextApiError::SendRequestError { error } => write!(f, "{}", error), + } + } +} + +impl Error for TextApiError {} + +impl From for TextApiError { + fn from(value: JsonPayloadError) -> Self { + TextApiError::InternalServerError { + error: value.to_string(), + } + } +} + +impl From for TextApiError { + fn from(value: SendRequestError) -> Self { + TextApiError::SendRequestError { error: value } + } +} + +#[derive(Serialize)] +pub struct TelnyxSmsAuthCheck { + verify_profile_id: String, + code: String, +} + +#[derive(Debug, Deserialize)] +pub struct TelnyxSmsAuthResponseBody { + pub data: TelnyxSmsAuthResponse, +} + +/// Response code is either accepted or rejected +#[derive(Debug, Deserialize)] +pub struct TelnyxSmsAuthResponse { + pub phone_number: String, + pub response_code: String, +} + +/// Posts to the validation endpoint with the code, will return success if the code +/// is the same as the one sent to the user +pub async fn check_sms_auth_result( + number: PhoneNumber, + code: String, + bearer_key: String, + verify_profile_id: String, +) -> Result { + info!("About to check text message status for {}", number); + + let check_url = format!( + "https://api.telnyx.com/v2/verifications/by_phone_number/{}/actions/verify", + number + ); + + let client = awc::Client::default(); + match client + .post(check_url) + .bearer_auth(bearer_key) + .send_json(&TelnyxSmsAuthCheck { + verify_profile_id, + code, + }) + .await + { + Ok(mut a) => { + let response = a.json::().await?; + if response.data.response_code == "accepted" { + Ok(true) + } else { + Ok(false) + } + } + Err(e) => { + error!("Failed to verify code with {:?}", e); + Err(e.into()) + } + } +} + +#[derive(Serialize)] +pub struct TelnyxAuthMessage { + /// user target number + pub phone_number: String, + pub verify_profile_id: String, +} + +/// Url for sending auth code +const URL_START: &str = "https://api.telnyx.com/v2/verifications/sms"; +pub async fn start_sms_auth_flow( + phone_number: PhoneNumber, + bearer_key: String, + verify_profile_id: String, +) -> Result<(), TextApiError> { + let client = awc::Client::default(); + match client + .post(URL_START) + .bearer_auth(bearer_key) + .timeout(Duration::from_secs(1)) + .send_json(&TelnyxAuthMessage { + phone_number: phone_number.to_string(), + verify_profile_id, + }) + .await + { + Ok(_) => Ok(()), + Err(e) => { + error!("auth text error {:?}", e); + Err(e.into()) + } + } +} + +/// Required because althea types doesn't import web30 and web30 doesn't import althea types making a from or +/// into conversion impossible +pub fn convert_althea_types_to_web3_error( + input: Result, +) -> Result { + match input { + Ok(a) => Ok(a), + Err(e) => Err(Web3Error::BadResponse(format!("{e}"))), + } +} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 3aec1ddc9..74ee40e91 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -38,3 +38,5 @@ web30 = {workspace = true} lazy_static = "1.5" actix-web = {workspace = true} phonenumber = "0.3.6" +crossbeam = "0.8" +tokio = "1.41" \ No newline at end of file diff --git a/integration_tests/src/db_migration_test.rs b/integration_tests/src/db_migration_test.rs index 463953d81..cd31973f9 100644 --- a/integration_tests/src/db_migration_test.rs +++ b/integration_tests/src/db_migration_test.rs @@ -1,25 +1,26 @@ -use std::{ - thread, - time::{Duration, Instant}, +use crate::{ + payments_eth::{TRASACTION_TIMEOUT, WEB3_TIMEOUT}, + setup_utils::database::start_postgres, + utils::{deploy_contracts, get_eth_node, REGISTRATION_SERVER_KEY}, }; - use althea_types::random_identity; use clarity::{Address, PrivateKey}; +use crossbeam::queue::SegQueue; use diesel::{PgConnection, RunQueryDsl}; use exit_trust_root::{ client_db::{check_and_add_user_admin, get_all_registered_clients}, register_client_batch_loop::register_client_batch_loop, }; +use futures::future::{select, Either}; use rita_db_migration::{ get_database_connection, models::Client, schema::clients::dsl::clients, start_db_migration, }; -use web30::client::Web3; - -use crate::{ - payments_eth::{TRASACTION_TIMEOUT, WEB3_TIMEOUT}, - setup_utils::database::start_postgres, - utils::{deploy_contracts, get_eth_node, REGISTRATION_SERVER_KEY}, +use std::{ + sync::Arc, + thread, + time::{Duration, Instant}, }; +use web30::client::Web3; pub const DB_URI: &str = "postgres://postgres@localhost/test"; @@ -62,26 +63,36 @@ pub async fn run_db_migration_test() { thread::sleep(Duration::from_secs(5)); + let queue = Arc::new(SegQueue::new()); + info!("Starting registration loop"); - register_client_batch_loop(get_eth_node(), althea_db_addr, reg_server_key); + let reg_loop = register_client_batch_loop(get_eth_node(), reg_server_key, queue.clone()); info!("Running user migration"); - match start_db_migration( + start_db_migration( DB_URI.to_string(), get_eth_node(), reg_server_key.to_address(), althea_db_addr, + queue.clone(), + ) + .await + .expect("Failed to start migration!"); + + // wait for the timeout while also running the registration loop + match select( + Box::pin(validate_db_migration( + num_clients, + althea_db_addr, + reg_server_key, + )), + Box::pin(reg_loop), ) .await { - Ok(_) => println!("Successfully migrated all clients!"), - Err(e) => println!("Failed to migrate clients with {}", e), - } - - info!("Waiting for register loop to migrate all clients"); - thread::sleep(Duration::from_secs(10)); - - validate_db_migration(num_clients, althea_db_addr, reg_server_key).await; + Either::Left((_, _)) => info!("Successfully migrated all clients!"), + Either::Right((_, _)) => panic!("Registration loop crashed!"), + }; } fn add_dummy_clients_to_db(num_of_entries: usize, conn: &PgConnection) { diff --git a/integration_tests/src/debts.rs b/integration_tests/src/debts.rs index 36ab89f5b..82ce533bb 100644 --- a/integration_tests/src/debts.rs +++ b/integration_tests/src/debts.rs @@ -3,9 +3,8 @@ use std::thread; use std::time::Duration; use crate::five_nodes::five_node_config; -use crate::registration_server::start_registration_server; use crate::setup_utils::namespaces::*; -use crate::setup_utils::rita::{spawn_exit_root, thread_spawner}; +use crate::setup_utils::rita::{spawn_exit_root_of_trust, thread_spawner}; use crate::utils::{ add_exits_contract_exit_list, deploy_contracts, generate_traffic, get_default_settings, get_ip_from_namespace, populate_routers_eth, query_debts, register_all_namespaces_to_exit, @@ -46,10 +45,8 @@ pub async fn run_debts_test() { info!("Waiting to deploy contracts"); let db_addr = deploy_contracts().await; - info!("Starting registration server"); - start_registration_server(db_addr).await; - - let (client_settings, exit_settings, exit_root_addr) = get_default_settings(namespaces.clone()); + let (client_settings, exit_settings, exit_root_addr) = + get_default_settings(namespaces.clone(), db_addr); // The exit price is set to ns.cost during thread_spawner let exit_price = namespaces.get_namespace(4).unwrap().cost; @@ -60,7 +57,7 @@ pub async fn run_debts_test() { info!("Namespaces setup: {res:?}"); info!("Starting root server!"); - spawn_exit_root(); + spawn_exit_root_of_trust(db_addr).await; let rita_identities = thread_spawner(namespaces.clone(), client_settings, exit_settings, db_addr) diff --git a/integration_tests/src/five_nodes.rs b/integration_tests/src/five_nodes.rs index 206803bb7..7a7fd169b 100644 --- a/integration_tests/src/five_nodes.rs +++ b/integration_tests/src/five_nodes.rs @@ -1,6 +1,5 @@ -use crate::registration_server::start_registration_server; use crate::setup_utils::namespaces::*; -use crate::setup_utils::rita::{spawn_exit_root, thread_spawner}; +use crate::setup_utils::rita::{spawn_exit_root_of_trust, thread_spawner}; use crate::utils::{ add_exits_contract_exit_list, deploy_contracts, get_default_settings, populate_routers_eth, register_all_namespaces_to_exit, test_all_internet_connectivity, test_reach_all, test_routes, @@ -18,10 +17,8 @@ pub async fn run_five_node_test_scenario() { info!("Waiting to deploy contracts"); let db_addr = deploy_contracts().await; - info!("Starting registration server"); - start_registration_server(db_addr).await; - - let (client_settings, exit_settings, exit_root_addr) = get_default_settings(namespaces.clone()); + let (client_settings, exit_settings, exit_root_addr) = + get_default_settings(namespaces.clone(), db_addr); namespaces.validate(); @@ -29,7 +26,7 @@ pub async fn run_five_node_test_scenario() { info!("Namespaces setup: {res:?}"); info!("Starting root server!"); - spawn_exit_root(); + spawn_exit_root_of_trust(db_addr).await; let rita_identities = thread_spawner(namespaces.clone(), client_settings, exit_settings, db_addr) diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index b5d5fa5ea..5c1e4cf57 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -13,7 +13,6 @@ pub mod five_nodes; pub mod mutli_exit; pub mod payments_althea; pub mod payments_eth; -pub mod registration_server; pub mod setup_utils; pub mod utils; diff --git a/integration_tests/src/mutli_exit.rs b/integration_tests/src/mutli_exit.rs index 4827f1a38..aa44987be 100644 --- a/integration_tests/src/mutli_exit.rs +++ b/integration_tests/src/mutli_exit.rs @@ -3,10 +3,9 @@ use std::{collections::HashMap, str::from_utf8, thread, time::Duration}; use althea_kernel_interface::run_command; use crate::{ - registration_server::start_registration_server, setup_utils::{ namespaces::{setup_ns, Namespace, NamespaceInfo, NodeType, PriceId, RouteHop}, - rita::{spawn_exit_root, thread_spawner}, + rita::{spawn_exit_root_of_trust, thread_spawner}, }, utils::{ add_exits_contract_exit_list, deploy_contracts, get_default_settings, get_node_id_from_ip, @@ -41,18 +40,15 @@ pub async fn run_multi_exit_test() { info!("Waiting to deploy contracts"); let db_addr = deploy_contracts().await; - info!("Starting registration server"); - start_registration_server(db_addr).await; - let (rita_client_settings, rita_exit_settings, exit_root_addr) = - get_default_settings(namespaces.clone()); + get_default_settings(namespaces.clone(), db_addr); namespaces.validate(); let res = setup_ns(namespaces.clone()); info!("Starting root server!"); - spawn_exit_root(); + spawn_exit_root_of_trust(db_addr).await; let rita_identities = thread_spawner( namespaces.clone(), diff --git a/integration_tests/src/payments_althea.rs b/integration_tests/src/payments_althea.rs index bceaa005f..e0fc57553 100644 --- a/integration_tests/src/payments_althea.rs +++ b/integration_tests/src/payments_althea.rs @@ -1,7 +1,6 @@ use crate::five_nodes::five_node_config; -use crate::registration_server::start_registration_server; use crate::setup_utils::namespaces::*; -use crate::setup_utils::rita::{spawn_exit_root, thread_spawner}; +use crate::setup_utils::rita::{spawn_exit_root_of_trust, thread_spawner}; use crate::utils::{ add_exits_contract_exit_list, deploy_contracts, generate_traffic, get_althea_grpc, get_default_settings, populate_routers_eth, print_althea_balances, @@ -48,11 +47,8 @@ pub async fn run_althea_payments_test_scenario() { info!("Waiting to deploy contracts"); let db_addr = deploy_contracts().await; - info!("Starting registration server"); - start_registration_server(db_addr).await; - let (mut client_settings, mut exit_settings, exit_root_addr) = - get_default_settings(namespaces.clone()); + get_default_settings(namespaces.clone(), db_addr); namespaces.validate(); @@ -60,7 +56,7 @@ pub async fn run_althea_payments_test_scenario() { info!("Namespaces setup: {res:?}"); info!("Starting root server!"); - spawn_exit_root(); + spawn_exit_root_of_trust(db_addr).await; // Modify configs to use Althea chain let (client_settings, exit_settings) = diff --git a/integration_tests/src/payments_eth.rs b/integration_tests/src/payments_eth.rs index 5c6790c65..7170075ee 100644 --- a/integration_tests/src/payments_eth.rs +++ b/integration_tests/src/payments_eth.rs @@ -1,8 +1,7 @@ use crate::five_nodes::five_node_config; -use crate::registration_server::start_registration_server; use crate::setup_utils::namespaces::setup_ns; use crate::setup_utils::namespaces::Namespace; -use crate::setup_utils::rita::spawn_exit_root; +use crate::setup_utils::rita::spawn_exit_root_of_trust; use crate::setup_utils::rita::thread_spawner; use crate::utils::add_exits_contract_exit_list; use crate::utils::deploy_contracts; @@ -48,11 +47,8 @@ pub async fn run_eth_payments_test_scenario() { info!("Waiting to deploy contracts"); let db_addr = deploy_contracts().await; - info!("Starting registration server"); - start_registration_server(db_addr).await; - let (mut client_settings, mut exit_settings, exit_root_addr) = - get_default_settings(namespaces.clone()); + get_default_settings(namespaces.clone(), db_addr); // Set payment thresholds low enough so that they get triggered after an iperf let (client_settings, exit_settings) = @@ -64,7 +60,7 @@ pub async fn run_eth_payments_test_scenario() { info!("Namespaces setup: {res:?}"); info!("Starting root server!"); - spawn_exit_root(); + spawn_exit_root_of_trust(db_addr).await; let rita_identities = thread_spawner(namespaces.clone(), client_settings, exit_settings, db_addr) diff --git a/integration_tests/src/registration_server.rs b/integration_tests/src/registration_server.rs deleted file mode 100644 index 1ca0b4586..000000000 --- a/integration_tests/src/registration_server.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::thread; - -use actix_rt::System; -use actix_web::{ - web::{self, Json}, - App, HttpResponse, HttpServer, -}; -use althea_types::ExitClientIdentity; -use clarity::{Address, PrivateKey}; -use exit_trust_root::{ - client_db::check_and_add_user_admin, register_client_batch_loop::register_client_batch_loop, - rita_client_registration::handle_sms_registration, -}; -use web30::client::Web3; - -use crate::utils::REGISTRATION_SERVER_KEY; -use crate::{ - payments_eth::WEB3_TIMEOUT, - utils::{get_eth_node, get_test_runner_magic_phone, TX_TIMEOUT}, -}; - -pub const REGISTRATION_PORT_SERVER: u16 = 40400; - -pub async fn start_registration_server(db_addr: Address) { - let miner_private_key: PrivateKey = REGISTRATION_SERVER_KEY.parse().unwrap(); - let miner_pub_key = miner_private_key.to_address(); - let contact = Web3::new(&get_eth_node(), WEB3_TIMEOUT); - - check_and_add_user_admin( - &contact, - db_addr, - miner_pub_key, - miner_private_key, - Some(TX_TIMEOUT), - vec![], - ) - .await - .unwrap(); - - // Start the register loop - register_client_batch_loop(get_eth_node(), db_addr, miner_private_key); - - // Start endpoint listener - thread::spawn(move || { - let runner = System::new(); - runner.block_on(async move { - // Exit stuff, huge threadpool to offset Pgsql blocking - let _res = HttpServer::new(|| { - App::new() - .route("/register_router", web::post().to(register_router)) - .route("/test", web::get().to(test_endpoint)) - }) - .bind(format!("7.7.7.1:{}", REGISTRATION_PORT_SERVER)) - .unwrap() - .shutdown_timeout(0) - .run() - .await; - }); - }); -} - -async fn register_router(client: Json) -> HttpResponse { - let client = client.into_inner(); - info!("Attempting to register client: {}", client.global.mesh_ip); - - HttpResponse::Ok().json( - handle_sms_registration( - client, - "dummy key".to_string(), - "dummy-id".to_string(), - Some(get_test_runner_magic_phone()), - ) - .await, - ) -} - -async fn test_endpoint() -> HttpResponse { - HttpResponse::Ok().finish() -} diff --git a/integration_tests/src/setup_utils/namespaces.rs b/integration_tests/src/setup_utils/namespaces.rs index ba575e86b..cbb422382 100644 --- a/integration_tests/src/setup_utils/namespaces.rs +++ b/integration_tests/src/setup_utils/namespaces.rs @@ -105,12 +105,12 @@ impl NamespaceInfo { /// For each key in destination, the u32 value is the price we expect to see in its route, /// and the namespace value is the next hop we take to reach the key. This struct is meant to /// be used within an outer hashmap which holds the "from" namespace. -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct RouteHop { pub destination: HashMap, } -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct PriceId { pub price: u32, pub id: u16, diff --git a/integration_tests/src/setup_utils/rita.rs b/integration_tests/src/setup_utils/rita.rs index c981f2ef2..993c70e71 100644 --- a/integration_tests/src/setup_utils/rita.rs +++ b/integration_tests/src/setup_utils/rita.rs @@ -1,20 +1,32 @@ -use crate::utils::TEST_EXIT_DETAILS; - use super::babel::spawn_babel; use super::namespaces::get_nsfd; use super::namespaces::NamespaceInfo; use super::namespaces::NodeType; +use crate::utils::get_eth_node; +use crate::utils::get_test_runner_magic_phone; +use crate::utils::EXIT_ROOT_SERVER_URL; +use crate::utils::REGISTRATION_SERVER_KEY; +use crate::utils::TEST_EXIT_DETAILS; +use crate::SETUP_WAIT; use actix_web::rt::System; use actix_web::web; use actix_web::App; use actix_web::HttpServer; use althea_kernel_interface::KernelInterfaceError; use althea_types::Identity; -use althea_types::SignedExitServerList; use clarity::Address; +use clarity::PrivateKey; +use crossbeam::queue::SegQueue; use exit_trust_root; -use exit_trust_root::return_exit_contract_data; +use exit_trust_root::client_db::check_and_add_user_admin; +use exit_trust_root::config::Config; +use exit_trust_root::config::ConfigAndCache; +use exit_trust_root::endpoints::return_signed_exit_contract_data; +use exit_trust_root::endpoints::start_client_registration; +use exit_trust_root::endpoints::submit_registration_code; +use exit_trust_root::register_client_batch_loop::register_client_batch_loop; use exit_trust_root::signature_update_loop; +use futures::join; use ipnetwork::IpNetwork; use ipnetwork::Ipv6Network; use log::info; @@ -33,11 +45,12 @@ use rita_exit::{ use settings::exit::EXIT_LIST_IP; use settings::set_flag_config; use settings::{client::RitaClientSettings, exit::RitaExitSettingsStruct}; +use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::time::Instant; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, convert::TryInto, fs::{self}, net::{IpAddr, Ipv6Addr}, @@ -45,6 +58,7 @@ use std::{ thread, time::Duration, }; +use web30::client::Web3; /// This struct contains metadata about instances that the thread spanwer has spawned /// if you need any data about an instance that can be had at startup use this to pass it @@ -290,27 +304,63 @@ pub fn spawn_rita_exit( } /// Spawns the exit root server and waits for it to finish starting, panics if it does not finish starting -pub fn spawn_exit_root() { +/// also adds the exit root to the list of allowed admins that can register users. This is possible becuase +/// the registration server key is the owner of the exit root contract. It's also the address we're using for +/// the root of trust server, but just becuase it's the owner doesn't mean it's automatically an admin that can +/// register users. We have to add it to the list of admins. +pub async fn spawn_exit_root_of_trust(db_addr: Address) { + let registration_server_key: PrivateKey = REGISTRATION_SERVER_KEY.parse().unwrap(); + check_and_add_user_admin( + &Web3::new(&get_eth_node(), SETUP_WAIT), + db_addr, + registration_server_key.to_address(), + registration_server_key, + Some(SETUP_WAIT), + vec![], + ) + .await + .unwrap(); + let successful_start: Arc = Arc::new(AtomicBool::new(false)); let start_move = successful_start.clone(); // the exit root server does not get its own namespace- instead it runs in the native namespace/host - let exit_contract_data_cache: Arc>> = - Arc::new(RwLock::new(HashMap::new())); - signature_update_loop(exit_contract_data_cache.clone()); - let web_data = web::Data::new(exit_contract_data_cache.clone()); + let exit_contract_data_cache = ConfigAndCache { + config: Arc::new(Config { + timeout: 60, + rpc: get_eth_node(), + private_key: registration_server_key, + telnyx_api_key: String::new(), + verify_profile_id: String::new(), + magic_number: Some(get_test_runner_magic_phone()), + https: true, + url: EXIT_ROOT_SERVER_URL.to_string(), + }), + cache: Arc::new(RwLock::new(HashMap::new())), + registration_queue: Arc::new(SegQueue::new()), + texts_sent: Arc::new(RwLock::new(HashMap::new())), + }; thread::spawn(move || { + let web_data = web::Data::new(exit_contract_data_cache.clone()); + let sig_loop = signature_update_loop(exit_contract_data_cache.clone()); + let reg_loop = register_client_batch_loop( + exit_contract_data_cache.config.rpc.clone(), + exit_contract_data_cache.config.private_key, + exit_contract_data_cache.registration_queue.clone(), + ); let successful_start = start_move.clone(); let runner = System::new(); runner.block_on(async move { let server = HttpServer::new(move || { App::new() - .service(return_exit_contract_data) + .service(return_signed_exit_contract_data) + .service(start_client_registration) + .service(submit_registration_code) .app_data(web_data.clone()) }); info!("Starting exit trust root server on 10.0.0.1:4050"); let server = server.bind("10.0.0.1:4050").unwrap(); successful_start.store(true, Relaxed); - let _ = server.run().await; + let _ = join!(server.run(), sig_loop, reg_loop); }); }); diff --git a/integration_tests/src/utils.rs b/integration_tests/src/utils.rs index 7f02dcd0d..7e18119e9 100644 --- a/integration_tests/src/utils.rs +++ b/integration_tests/src/utils.rs @@ -20,7 +20,8 @@ use althea_types::{ regions::Regions, ContactType, Denom, ExitIdentity, Identity, SystemChain, WgKey, }; use babel_monitor::{open_babel_stream, parse_routes, structs::Route}; -use clarity::{Address, PrivateKey as ClarityPrivkey, Transaction, Uint256}; +use clarity::PrivateKey as ClarityPrivkey; +use clarity::{Address, Transaction, Uint256}; use deep_space::{Address as AltheaAddress, Coin, Contact, CosmosPrivateKey, PrivateKey}; use exit_trust_root::client_db::{add_exit_admin, add_exits_to_registration_list}; use futures::future::join_all; @@ -220,8 +221,9 @@ pub fn test_reach_all(nsinfo: NamespaceInfo) { pub fn test_reach_all_async(nsinfo: NamespaceInfo) -> bool { for i in nsinfo.clone().names { for j in nsinfo.clone().names { - if test_reach(i.clone(), j) { + if test_reach(i.clone(), j.clone()) { // ping failed + error!("Ping for {:?} to {:?} failed, retrying...", i, j); return false; } } @@ -302,6 +304,11 @@ pub fn test_routes_async(nsinfo: NamespaceInfo, expected: HashMap ClarityPrivkey { + get_eth_miner_key() +} + /// Gets the default client and exit settings pub fn get_default_settings( namespaces: NamespaceInfo, + exit_db_contract: Address, ) -> (RitaClientSettings, RitaExitSettingsStruct, Address) { let mut exit_servers = HashMap::new(); - // generate keys for the exit root server - let exit_root_privkey = ClarityPrivkey::from_bytes([1u8; 32]).unwrap(); - let exit_root_addr = exit_root_privkey.to_address(); + let exit_root_addr = get_exit_root_private_key().to_address(); info!("Exit root address is {:?}", exit_root_addr); let exit = RitaExitSettingsStruct { - client_registration_url: "https://7.7.7.1:40400/register_router".to_string(), workers: 2, remote_log: false, description: "Test environment exit instance".to_string(), @@ -401,7 +410,6 @@ pub fn get_default_settings( allowed_countries: HashSet::new(), log: LoggingSettings::default(), operator: ExitOperatorSettings::default(), - allowed_exit_list_signatures: vec![exit_root_addr], exit_root_url: EXIT_ROOT_SERVER_URL.to_owned(), }; let client = RitaClientSettings::default(); @@ -442,7 +450,9 @@ pub fn get_default_settings( .into(), ); - client.exit_client.allowed_exit_list_signatures = vec![exit_root_addr]; + let key: ClarityPrivkey = REGISTRATION_SERVER_KEY.parse().unwrap(); + client.exit_client.allowed_exit_list_signers = vec![key.to_address()]; + client.exit_client.exit_db_smart_contract = exit_db_contract; // first node is passed through to the host machine for testing second node is used // for testnet queries diff --git a/rita_client/src/exit_manager/requests.rs b/rita_client/src/exit_manager/requests.rs index 96c97e972..ca003215f 100644 --- a/rita_client/src/exit_manager/requests.rs +++ b/rita_client/src/exit_manager/requests.rs @@ -11,6 +11,7 @@ use althea_types::WgKey; use althea_types::{ExitClientIdentity, ExitRegistrationDetails, ExitState}; use settings::exit::EXIT_LIST_IP; use settings::exit::EXIT_LIST_PORT; +use settings::get_registration_details; use settings::get_rita_client; use settings::set_rita_client; use std::net::SocketAddr; @@ -109,15 +110,14 @@ pub async fn exit_setup_request(code: Option) -> Result<(), RitaClientEr ExitState::New { .. } | ExitState::Pending { .. } => { let exit_pubkey = exit.wg_key; - let mut reg_details: ExitRegistrationDetails = - match client_settings.payment.contact_info { - Some(val) => val.into(), - None => { - return Err(RitaClientError::MiscStringError( - "No registration info set!".to_string(), - )) - } - }; + let mut reg_details: ExitRegistrationDetails = match get_registration_details() { + Some(val) => val, + None => { + return Err(RitaClientError::MiscStringError( + "No registration info set!".to_string(), + )) + } + }; // Send a verification code if we have one reg_details.phone_code = code; @@ -190,8 +190,8 @@ pub async fn exit_status_request(exit: ExitIdentity) -> Result<(), RitaClientErr return Err(RitaClientError::NoExitError(exit.mesh_ip.to_string())); } }; - let reg_details = match settings::get_rita_client().payment.contact_info { - Some(val) => val.into(), + let reg_details = match get_registration_details() { + Some(val) => val, None => { return Err(RitaClientError::MiscStringError( "No valid details".to_string(), @@ -262,9 +262,24 @@ pub async fn get_exit_list() -> Result { }; let config = get_rita_client(); - let allowed_signers = config.exit_client.allowed_exit_list_signatures; - // signature must both be valid and from a trusted signer - if list.verify() && allowed_signers.contains(&list.get_signer()) { + let allowed_signers = config.exit_client.allowed_exit_list_signers; + + trace!( + "About to verify exit list signer and contract we are expecting {} and signer {:?}", + config.exit_client.exit_db_smart_contract, + allowed_signers + ); + trace!( + "We have the contract {} with signer {}", + list.get_server_list().contract, + list.get_signer() + ); + + // signature must both be valid, from a trusted signer and for the contract we asked for + if list.verify() + && allowed_signers.contains(&list.get_signer()) + && list.get_server_list().contract == config.exit_client.exit_db_smart_contract + { // save list of verified exits let mut rita_client = settings::get_rita_client(); rita_client.exit_client.verified_exit_list = Some(list.get_server_list()); diff --git a/rita_db_migration/Cargo.toml b/rita_db_migration/Cargo.toml index f7af054aa..605817794 100644 --- a/rita_db_migration/Cargo.toml +++ b/rita_db_migration/Cargo.toml @@ -17,3 +17,4 @@ serde_json = "1.0" exit_trust_root = { path = "../exit_trust_root" } clarity = "1.4" web30 = "1.4" +crossbeam = "0.8" \ No newline at end of file diff --git a/rita_db_migration/src/lib.rs b/rita_db_migration/src/lib.rs index 25ce06e37..8e1e5a038 100644 --- a/rita_db_migration/src/lib.rs +++ b/rita_db_migration/src/lib.rs @@ -9,15 +9,16 @@ pub mod error; pub mod models; pub mod schema; -use std::{collections::HashSet, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use crate::schema::clients::dsl::clients; use althea_types::Identity; use clarity::Address; +use crossbeam::queue::SegQueue; use diesel::{r2d2::ConnectionManager, PgConnection, RunQueryDsl}; use error::RitaDBMigrationError; use exit_trust_root::{ - client_db::get_all_registered_clients, rita_client_registration::add_client_to_reg_queue, + client_db::get_all_registered_clients, register_client_batch_loop::RegistrationRequest, }; use models::Client; use r2d2::PooledConnection; @@ -30,6 +31,7 @@ pub async fn start_db_migration( web3_url: String, requester_address: Address, db_addr: Address, + reg_queue: Arc>, ) -> Result<(), RitaDBMigrationError> { // Validate that db_url and contract_addr are valid if !(db_url.contains("postgres://") @@ -48,7 +50,14 @@ pub async fn start_db_migration( ); let contact = Web3::new(&web3_url, WEB3_TIMEOUT); - add_clients_to_reg_queue(clients_list, &contact, requester_address, db_addr).await + add_clients_to_reg_queue( + clients_list, + &contact, + requester_address, + db_addr, + reg_queue, + ) + .await } else { return Err(RitaDBMigrationError::MiscStringError( "Unable to get db clients".to_string(), @@ -63,6 +72,7 @@ async fn add_clients_to_reg_queue( contact: &Web3, requester_address: Address, contract: Address, + reg_queue: Arc>, ) { let existing_users: HashSet = match get_all_registered_clients(contact, requester_address, contract).await { @@ -104,7 +114,10 @@ async fn add_clients_to_reg_queue( if !existing_users.contains(&id) { info!("Adding user {}", id.mesh_ip); - add_client_to_reg_queue(id); + reg_queue.push(RegistrationRequest { + identity: id, + contract, + }); } else { warn!("User {} already exists!", id.mesh_ip); } diff --git a/rita_exit/src/database/mod.rs b/rita_exit/src/database/mod.rs index 386c7a2cf..e119a7fec 100644 --- a/rita_exit/src/database/mod.rs +++ b/rita_exit/src/database/mod.rs @@ -31,7 +31,9 @@ use althea_types::WgKey; use althea_types::{ExitClientDetails, ExitClientIdentity, ExitDetails, ExitState, ExitVerifMode}; use clarity::Address; use exit_trust_root::client_db::get_registered_client_using_wgkey; -use exit_trust_root::rita_client_registration::ExitSignupReturn; +use exit_trust_root::endpoints::RegisterRequest; +use exit_trust_root::endpoints::SubmitCodeRequest; +use phonenumber::PhoneNumber; use rita_common::blockchain_oracle::calculate_close_thresh; use rita_common::debt_keeper::get_debts_list; use rita_common::debt_keeper::DebtAction; @@ -103,53 +105,59 @@ pub async fn signup_client(client: ExitClientIdentity) -> Result Ok(ExitState::Registered { - our_details: ExitClientDetails { - client_internal_ip: exit_client.internal_ip, - internet_ipv6_subnet: exit_client.internet_ipv6, - }, - general_details: get_exit_info(), - message: "Registration OK".to_string(), - identity: Box::new(exit_settings.get_exit_identity()), - }), - - ExitSignupReturn::PendingRegistration => Ok(ExitState::Pending { - message: "awaiting verification".to_string(), - }), - ExitSignupReturn::BadPhoneNumber => Ok(ExitState::Pending { - message: format!( - "Error parsing client phone number {:?}", - exit_client.public_key, - ), - }), - ExitSignupReturn::InternalServerError { e } => Ok(ExitState::Pending { - message: format!("Internal Error from registration server {:?}", e,), - }), + let number = match client.clone().reg_details.phone { + Some(n) => n, + None => { + return Err(Box::new(RitaExitError::MiscStringError( + "Phone number is required for registration".to_string(), + ))); + } + }; + + let phone_number: PhoneNumber = match number.parse() { + Ok(p) => p, + Err(e) => { + return Err(Box::new(RitaExitError::MiscStringError(format!( + "Failed to parse phone number with error {}", + e + )))); } + }; + + let exit_client = to_exit_client(client.global)?; + // if there is a phone registration code, we should submit it for verification + if let Some(code) = client.reg_details.phone_code.clone() { + info!("Forwarding client verification request"); + forward_client_verify_request(client, phone_number, code).await?; + Ok(ExitState::Registered { + our_details: ExitClientDetails { + client_internal_ip: exit_client.internal_ip, + internet_ipv6_subnet: exit_client.internet_ipv6, + }, + general_details: get_exit_info(), + message: "Registration OK".to_string(), + identity: Box::new(exit_settings.get_exit_identity()), + }) } else { + info!("Forwarding client signup request"); + // if there is no phone registration code, we should submit the client for registration + forward_client_signup_request(client, phone_number).await?; Ok(ExitState::Pending { - message: format!("Error parsing client details with {:?}", exit_client,), + message: "awaiting verification".to_string(), }) } } -pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> ExitSignupReturn { - let url: &str; - let reg_url = get_rita_exit().client_registration_url; - if cfg!(feature = "dev_env") { - url = "http://7.7.7.1:40400/register_router"; - } else if cfg!(feature = "operator_debug") { - url = "http://192.168.10.2:40400/register_router"; - } else { - url = ®_url; - } +pub async fn forward_client_verify_request( + exit_client: ExitClientIdentity, + phone_number: PhoneNumber, + code: String, +) -> Result<(), RitaExitError> { + let settings = get_rita_exit(); + let url = format!("{}/submit_code", settings.exit_root_url); info!( - "About to request client {} registration with {}", + "About to submit client code {} with {}", exit_client.global, url ); @@ -157,33 +165,65 @@ pub async fn forward_client_signup_request(exit_client: ExitClientIdentity) -> E let response = client .post(url) .timeout(CLIENT_REGISTER_TIMEOUT) - .send_json(&exit_client) + .send_json(&SubmitCodeRequest { + phone_number, + identity: exit_client.global, + code, + contract: exit_client.reg_details.exit_database_contract, + }) .await; - let response = match response { - Ok(mut response) => { - trace!("Response is {:?}", response.status()); - trace!("Response is {:?}", response.headers()); - response.json().await + match response { + Ok(v) => { + trace!("Response is {:?}", v.status()); + trace!("Response is {:?}", v.headers()); + if v.status().is_success() { + Ok(()) + } else { + Err(RitaExitError::MiscStringError(v.status().to_string())) + } } Err(e) => { error!("Failed to perform client registration with {:?}", e); - return ExitSignupReturn::InternalServerError { - e: format!("Unable to contact registration server: {}", e), - }; + Err(RitaExitError::MiscStringError(e.to_string())) } - }; + } +} + +pub async fn forward_client_signup_request( + exit_client: ExitClientIdentity, + phone_number: PhoneNumber, +) -> Result<(), RitaExitError> { + let settings = get_rita_exit(); + let url = format!("{}/register", settings.exit_root_url); + + info!( + "About to request registration for client {} registration with {}", + exit_client.global, url + ); + + let client = awc::Client::default(); + let response = client + .post(url) + .timeout(CLIENT_REGISTER_TIMEOUT) + .send_json(&RegisterRequest { phone_number }) + .await; - let response: ExitSignupReturn = match response { - Ok(a) => a, + match response { + Ok(v) => { + trace!("Response is {:?}", v.status()); + trace!("Response is {:?}", v.headers()); + if v.status().is_success() { + Ok(()) + } else { + Err(RitaExitError::MiscStringError(v.status().to_string())) + } + } Err(e) => { - error!("Failed to decode registration request {:?}", e); - return ExitSignupReturn::InternalServerError { - e: format!("Failed to decode registration request {:?}", e), - }; + error!("Failed to perform client registration with {:?}", e); + Err(RitaExitError::MiscStringError(e.to_string())) } - }; - response + } } /// Gets the status of a client and updates it in the database diff --git a/rita_exit/src/network_endpoints/mod.rs b/rita_exit/src/network_endpoints/mod.rs index b300ff206..43e20ede2 100644 --- a/rita_exit/src/network_endpoints/mod.rs +++ b/rita_exit/src/network_endpoints/mod.rs @@ -225,11 +225,10 @@ pub async fn get_exit_list( async fn get_exit_list_from_root(contract_addr: Address) -> Option { let rita_exit = get_rita_exit(); let request_url = rita_exit.exit_root_url; - let allowed_signers = rita_exit.allowed_exit_list_signatures; let timeout = Duration::new(15, 0); let client = awc::Client::new(); let request_url = format!("{}/{}", request_url, contract_addr); - info!("Requesting exit list from {}", request_url); + trace!("Requesting exit list from {}", request_url); let mut response = client .get(request_url) .timeout(timeout) @@ -237,12 +236,14 @@ async fn get_exit_list_from_root(contract_addr: Address) -> Option().await { Ok(a) => { // verify the signature of the exit list - if a.verify() && allowed_signers.contains(&a.get_signer()) { - info!("Verified exit list signature"); + // note the exit cares if it's passing along a valid signature + // but does not police allowed signers, that's entierly up to the client + if a.verify() { + trace!("Verified exit list signature"); return Some(a); } error!("Failed to verify exit list signature"); diff --git a/settings/src/client.rs b/settings/src/client.rs index 156ecfe00..1199e63ce 100644 --- a/settings/src/client.rs +++ b/settings/src/client.rs @@ -21,8 +21,10 @@ pub fn default_config_path() -> PathBuf { format!("/etc/{APP_NAME}.toml").into() } -fn exit_db_smart_contract_on_xdai() -> String { - "0x29a3800C28dc133f864C22533B649704c6CD7e15".to_string() +fn exit_db_smart_contract() -> Address { + "0x29a3800C28dc133f864C22533B649704c6CD7e15" + .parse() + .unwrap() } fn default_registration_state() -> ExitState { @@ -43,13 +45,13 @@ pub struct ExitClientSettings { /// chain we are on. Since different chains may reference different registration smart contracts #[serde(default = "default_registration_state", flatten)] pub registration_state: ExitState, - /// This is the address of the exit database contract on the xDai chain, this value is a config value in case + /// This is the address of the exit database contract, this value is a config value in case /// a new version of the contract is ever deployed. Otherwise it won't change much. What this contract contains /// is the registration data for all routers, facilitating key exchange between new exits in the cluster and clients /// So the client registers with the smart contract and the exit takes it's registration data (wireguard key) and sets /// up a tunnel, vice versa for the client after finding an exit to register to - #[serde(default = "exit_db_smart_contract_on_xdai")] - pub exit_db_smart_contract_on_xdai: String, + #[serde(default = "exit_db_smart_contract")] + pub exit_db_smart_contract: Address, /// This controls which interfaces will be proxied over the exit tunnel pub lan_nics: HashSet, /// This is the region we are in, this is used to determine if we are in a region that is allowed to connect to the exit @@ -57,17 +59,17 @@ pub struct ExitClientSettings { /// If we have some value we will connect to exits that have that region specified as well as exits with no region specified. pub our_region: Option, /// This is the Address/Pubkey of the exit root of trust server which clients use to verify signed exit lists - pub allowed_exit_list_signatures: Vec
, + pub allowed_exit_list_signers: Vec
, } impl Default for ExitClientSettings { fn default() -> Self { ExitClientSettings { registration_state: default_registration_state(), - exit_db_smart_contract_on_xdai: exit_db_smart_contract_on_xdai(), + exit_db_smart_contract: exit_db_smart_contract(), lan_nics: HashSet::new(), our_region: None, - allowed_exit_list_signatures: Vec::new(), + allowed_exit_list_signers: Vec::new(), verified_exit_list: None, } } diff --git a/settings/src/exit.rs b/settings/src/exit.rs index 2e5621fbf..cc313cf2a 100644 --- a/settings/src/exit.rs +++ b/settings/src/exit.rs @@ -92,16 +92,13 @@ impl ExitNetworkSettings { fn default_remote_log() -> bool { false } -pub fn default_reg_url() -> String { - "https://operator.althea.net:8080/register_router".to_string() +pub fn default_root_url() -> String { + "https://exitroot.althea.net:4050".to_string() } /// This is the main settings struct for rita_exit #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub struct RitaExitSettingsStruct { - /// url exit uses to request a clients registration - #[serde(default = "default_reg_url")] - pub client_registration_url: String, /// the size of the worker thread pool, the connection pool is this plus one pub workers: u32, /// if we should log remotely or if we should send our logs to the logging server @@ -122,9 +119,8 @@ pub struct RitaExitSettingsStruct { /// (ISO country code) #[serde(skip_serializing_if = "HashSet::is_empty", default)] pub allowed_countries: HashSet, - /// This is the Address/Pubkey of the exit root of trust server which clients use to verify signed exit lists - pub allowed_exit_list_signatures: Vec
, - /// url to the exit root of trust server to query exit lists + /// url to the exit root of trust server to query exit lists, and make registration requests + #[serde(default = "default_root_url")] pub exit_root_url: String, } @@ -138,7 +134,6 @@ impl RitaExitSettingsStruct { /// default trait to prevent some future code from picking up on the 'default' implementation pub fn test_default() -> Self { RitaExitSettingsStruct { - client_registration_url: "".to_string(), workers: 1, remote_log: false, description: "".to_string(), @@ -149,8 +144,7 @@ impl RitaExitSettingsStruct { exit_network: ExitNetworkSettings::test_default(), allowed_countries: HashSet::new(), log: LoggingSettings::default(), - allowed_exit_list_signatures: Vec::new(), - exit_root_url: "".to_string(), + exit_root_url: "http://10.0.0.1:4050".to_string(), } } diff --git a/settings/src/lib.rs b/settings/src/lib.rs index 34077df74..28fe7260d 100644 --- a/settings/src/lib.rs +++ b/settings/src/lib.rs @@ -18,7 +18,10 @@ extern crate log; extern crate arrayvec; use althea_kernel_interface::netns::check_integration_test_netns; -use althea_types::{BillingDetails, ContactType, Identity, InstallationDetails, SystemChain}; +use althea_types::{ + BillingDetails, ContactType, ExitRegistrationDetails, Identity, InstallationDetails, + SystemChain, +}; use clarity::Address; use logging::LoggingSettings; use network::NetworkSettings; @@ -330,6 +333,19 @@ pub fn get_contact_info() -> Option { let rita_client = get_rita_client(); option_convert(rita_client.payment.contact_info) } +pub fn get_registration_details() -> Option { + let rita_client = get_rita_client(); + match get_contact_info() { + Some(contact_info) => Some(ExitRegistrationDetails { + email: contact_info.get_email().map(|a| a.to_string()), + email_code: None, + phone: contact_info.get_phone().map(|a| a.to_string()), + phone_code: None, + exit_database_contract: rita_client.exit_client.exit_db_smart_contract, + }), + None => None, + } +} pub fn get_install_details() -> Option { let rita_client = get_rita_client(); let operator_settings = rita_client.operator; diff --git a/test_runner/src/main.rs b/test_runner/src/main.rs index 3ba0d49d4..9dadf0a9f 100644 --- a/test_runner/src/main.rs +++ b/test_runner/src/main.rs @@ -40,7 +40,7 @@ async fn main() { if let Ok(test_type) = test_type { if test_type == "FIVE_NODES" { run_five_node_test_scenario().await; - } else if test_type == "DEBTS_TEST" { + } else if test_type == "DEBTS_TEST" || test_type == "DEBTS" { run_debts_test().await; } else if test_type == "PAYMENTS_ETH" || test_type == "ETH_PAYMENTS" { run_eth_payments_test_scenario().await; @@ -50,7 +50,7 @@ async fn main() { run_multi_exit_test().await } else if test_type == "CONTRACT_TEST" { run_altheadb_contract_test().await - } else if test_type == "MIGRATION_TEST" { + } else if test_type == "MIGRATION_TEST" || test_type == "DB_MIGRATION" { run_db_migration_test().await } else { panic!("Error unknown test type {}!", test_type);