Skip to content

Commit

Permalink
WIP: Remove exit lazy static
Browse files Browse the repository at this point in the history
This patch simplifies the dataflow for th rita exit module by removing
the lazy static exit database and instead holding that data in the main
rita exit thread.
  • Loading branch information
jkilpatr committed Nov 11, 2024
1 parent 6ce7f66 commit 7f324ae
Show file tree
Hide file tree
Showing 15 changed files with 784 additions and 632 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions exit_trust_root/src/client_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clarity::{
abi::{encode_call, AbiToken},
Address, PrivateKey, Uint256,
};
use std::{net::IpAddr, time::Duration, vec};
use std::{collections::HashSet, net::IpAddr, time::Duration, vec};
use tokio::time::timeout as future_timeout;
use web30::{
client::Web3,
Expand All @@ -24,7 +24,7 @@ pub async fn get_all_registered_clients(
web30: &Web3,
requester_address: Address,
contract: Address,
) -> Result<Vec<Identity>, Web3Error> {
) -> Result<HashSet<Identity>, Web3Error> {
let payload = encode_call("getAllRegisteredUsers()", &[])?;
let res = web30
.simulate_transaction(
Expand All @@ -33,7 +33,8 @@ pub async fn get_all_registered_clients(
)
.await?;

convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))
let val = convert_althea_types_to_web3_error(Identity::decode_array_from_eth_abi(res))?;
Ok(val.into_iter().collect())
}

pub async fn get_registered_client_using_wgkey(
Expand Down
10 changes: 0 additions & 10 deletions exit_trust_root/src/register_client_batch_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ pub struct RegistrationRequest {

pub const MAX_BATCH_SIZE: usize = 75;

/// Utility function used to easily perform O(1) lookups against the identities list
pub fn get_clients_hashset(input: Vec<Identity>) -> HashSet<Identity> {
let mut output = HashSet::new();
for i in input {
output.insert(i);
}
output
}

/// This function monitors the registration queue lock free queue. It will dequeue any new entries and attempt to register them
/// in a batch sent every REGISTRATION_LOOP_SPEED seconds. This function will also check if the user is already registered before attempting to register them
pub async fn register_client_batch_loop(
Expand Down Expand Up @@ -106,7 +97,6 @@ pub async fn register_client_batch_loop(
continue;
}
};
let all_clients = get_clients_hashset(all_clients);

let mut clients_to_register = Vec::new();
for client in list.iter() {
Expand Down
15 changes: 12 additions & 3 deletions integration_tests/src/contract_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub async fn validate_contract_user_functionality(db_addr: Address) {
// Try requests when there are no users present
let res = get_all_registered_clients(&contact, miner_pub_key, db_addr).await;

assert_eq!(res.unwrap(), vec![]);
assert_eq!(res.unwrap(), HashSet::new());

let res =
get_registered_client_using_wgkey(user_1.wg_public_key, miner_pub_key, db_addr, &contact)
Expand Down Expand Up @@ -251,7 +251,9 @@ pub async fn validate_contract_user_functionality(db_addr: Address) {
.await
.unwrap();

assert_eq!(vec![user_1], res);
let mut set = HashSet::new();
set.insert(user_1);
assert_eq!(set, res);

let nonce = contact
.eth_get_transaction_count(miner_pub_key)
Expand Down Expand Up @@ -342,7 +344,14 @@ pub async fn validate_contract_user_functionality(db_addr: Address) {

info!("All users are : {:?}", res);

assert_eq!(vec![user_1, user_2, user_3, user_4, user_5, user_6], res);
let mut set = HashSet::new();
set.insert(user_1);
set.insert(user_2);
set.insert(user_3);
set.insert(user_4);
set.insert(user_5);
set.insert(user_6);
assert_eq!(set, res);

info!("Trying to retrive user 1");
let res =
Expand Down
15 changes: 12 additions & 3 deletions integration_tests/src/setup_utils/rita.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rita_common::rita_loop::{
write_to_disk::{save_to_disk_loop, SettingsOnDisk},
};
use rita_exit::rita_loop::start_rita_exit_list_endpoint;
use rita_exit::ClientListAnIpAssignmentMap;
use rita_exit::{
dashboard::start_rita_exit_dashboard,
operator_update::update_loop::start_operator_update_loop,
Expand All @@ -49,6 +50,7 @@ use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Instant;
use std::vec;
use std::{
collections::HashMap,
convert::TryInto,
Expand Down Expand Up @@ -286,14 +288,21 @@ pub fn spawn_rita_exit(
settings::get_rita_exit(),
)));

let client_and_ip_map = Arc::new(RwLock::new(ClientListAnIpAssignmentMap::new(
HashSet::new(),
)));

let workers = 4;
start_core_rita_endpoints(workers as usize);
start_rita_exit_endpoints(workers as usize);
start_rita_exit_list_endpoint(workers as usize);
start_rita_exit_endpoints(client_and_ip_map.clone());
start_rita_exit_list_endpoint();
start_rita_exit_dashboard(Arc::new(RwLock::new(None)));

// this one blocks
start_rita_exit_loop(vec![]);
let system = actix::System::new();
system.block_on(async {
start_rita_exit_loop(client_and_ip_map).await;
});
});

// wait for the child thread to finish initializing
Expand Down
2 changes: 0 additions & 2 deletions rita_bin/src/contract-util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use docopt::Docopt;
use exit_trust_root::client_db::add_exits_to_registration_list;
use exit_trust_root::client_db::add_users_to_registered_list;
use exit_trust_root::client_db::get_all_registered_clients;
use exit_trust_root::register_client_batch_loop::get_clients_hashset;
use exit_trust_root::register_client_batch_loop::MAX_BATCH_SIZE;
use log::{error, info};
use rita_db_migration::{
Expand Down Expand Up @@ -78,7 +77,6 @@ async fn main() {
panic!("Failed to get list of already registered clients {:?}", e);
}
};
let all_contract_clients = get_clients_hashset(all_contract_clients);

let db_conn = get_database_connection(db_url).unwrap();

Expand Down
15 changes: 10 additions & 5 deletions rita_bin/src/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use exit_trust_root::client_db::get_all_registered_clients;
#[cfg(feature = "jemalloc")]
use jemallocator::Jemalloc;
use rita_exit::rita_loop::start_rita_exit_list_endpoint;
use rita_exit::ClientListAnIpAssignmentMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
Expand Down Expand Up @@ -158,9 +160,12 @@ async fn main() {
check_startup_balance_and_contract(args.flag_fail_on_startup, startup_status).await;

let workers = settings.workers;

let client_and_ip_map = Arc::new(RwLock::new(ClientListAnIpAssignmentMap::new(clients)));

start_core_rita_endpoints(workers as usize);
start_rita_exit_endpoints(workers as usize);
start_rita_exit_list_endpoint(workers as usize);
start_rita_exit_endpoints(client_and_ip_map.clone());
start_rita_exit_list_endpoint();

start_rita_common_loops();
start_operator_update_loop();
Expand All @@ -169,7 +174,7 @@ async fn main() {
)));

// this call blocks, transforming this startup thread into the main exit watchdog thread
start_rita_exit_loop(clients);
start_rita_exit_loop(client_and_ip_map).await;
}

/// This function performs startup integrity checks on the config and system. It checks that we can reach the internet
Expand All @@ -182,7 +187,7 @@ async fn main() {
async fn check_startup_balance_and_contract(
fail_on_startup: bool,
startup_status: Arc<RwLock<Option<String>>>,
) -> Vec<Identity> {
) -> HashSet<Identity> {
let payment_settings = settings::get_rita_common().payment;
let our_address = payment_settings.eth_address.expect("No address!");

Expand Down Expand Up @@ -214,7 +219,7 @@ async fn check_startup_balance_and_contract(
users.unwrap()
}

async fn get_registered_users() -> Result<Vec<Identity>, Web3Error> {
async fn get_registered_users() -> Result<HashSet<Identity>, Web3Error> {
let payment_settings = settings::get_rita_common().payment;
let our_address = payment_settings.eth_address.expect("No address!");
let full_node = get_web3_server();
Expand Down
1 change: 0 additions & 1 deletion rita_exit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ babel_monitor = { path = "../babel_monitor" }
actix = {workspace = true}
awc = {workspace = true}
handlebars = "5.1"
lazy_static = "1.5"
ipnetwork = "0.20"
clarity = {workspace = true}
serde = "1.0"
Expand Down
Loading

0 comments on commit 7f324ae

Please sign in to comment.