From 18418294a04454ea26f4af85843579cbf7adca0e Mon Sep 17 00:00:00 2001 From: "S. Santos" Date: Wed, 10 Jul 2024 13:25:45 -0300 Subject: [PATCH] Refactor transfer-receive to make it non-blocking --- Cargo.lock | 1 + clients/apps/rust/src/main.rs | 17 ++- clients/libs/rust/src/transfer_receiver.rs | 101 ++++++++++--- clients/tests/rust/Cargo.toml | 1 + clients/tests/rust/src/main.rs | 2 + .../rust/src/ta01_sign_second_not_called.rs | 3 +- .../tests/rust/src/tb01_simple_transfer.rs | 5 +- .../rust/src/tb02_transfer_address_reuse.rs | 8 +- .../rust/src/tb03_simple_atomic_transfer.rs | 137 ++++++++++++++++++ .../rust/src/tm01_sender_double_spends.rs | 3 +- 10 files changed, 247 insertions(+), 31 deletions(-) create mode 100644 clients/tests/rust/src/tb03_simple_atomic_transfer.rs diff --git a/Cargo.lock b/Cargo.lock index 9eed7497..147c33f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2426,6 +2426,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "uuid 1.4.1", ] [[package]] diff --git a/clients/apps/rust/src/main.rs b/clients/apps/rust/src/main.rs index d84dc1fc..16d80858 100644 --- a/clients/apps/rust/src/main.rs +++ b/clients/apps/rust/src/main.rs @@ -1,3 +1,5 @@ +use std::{thread, time::Duration}; + use anyhow::Result; use clap::{Parser, Subcommand}; use serde_json::json; @@ -162,7 +164,20 @@ async fn main() -> Result<()> { }, Commands::TransferReceive { wallet_name } => { mercuryrustlib::coin_status::update_coins(&client_config, &wallet_name).await?; - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet_name).await?; + + let mut received_statechain_ids = Vec::::new(); + + loop { + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet_name).await?; + received_statechain_ids.extend(transfer_receive_result.received_statechain_ids); + + if transfer_receive_result.is_there_batch_locked { + println!("Statecoin batch still locked. Waiting until expiration or unlock."); + thread::sleep(Duration::from_secs(5)); + } else { + break; + } + } let obj = json!(received_statechain_ids); diff --git a/clients/libs/rust/src/transfer_receiver.rs b/clients/libs/rust/src/transfer_receiver.rs index 74a5e982..cef72fd8 100644 --- a/clients/libs/rust/src/transfer_receiver.rs +++ b/clients/libs/rust/src/transfer_receiver.rs @@ -1,7 +1,7 @@ -use std::{collections::{HashMap, HashSet}, str::FromStr, thread, time::Duration}; +use std::{collections::{HashMap, HashSet}, str::FromStr}; use crate::{sqlite_manager::{get_wallet, update_wallet, insert_or_update_backup_txs}, client_config::ClientConfig, utils}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Ok, Result}; use bitcoin::{Txid, Address}; use chrono::Utc; use electrum_client::ElectrumApi; @@ -23,7 +23,12 @@ pub async fn new_transfer_address(client_config: &ClientConfig, wallet_name: &st Ok(coin.address) } -pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result>{ +pub struct TransferReceiveResult { + pub is_there_batch_locked: bool, + pub received_statechain_ids: Vec, +} + +pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result{ let mut wallet = get_wallet(&client_config.pool, &wallet_name).await?; @@ -47,6 +52,8 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result< enc_msgs_per_auth_pubkey.insert(auth_pubkey.clone(), enc_messages); } + let mut is_there_batch_locked = false; + let mut received_statechain_ids = Vec::::new(); let mut temp_coins = wallet.coins.clone(); @@ -64,14 +71,22 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result< let mut coin = coin.unwrap(); - let statechain_id_added = process_encrypted_message(client_config, &mut coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await; + let message_result = process_encrypted_message(client_config, &mut coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await; - if statechain_id_added.is_err() { - println!("Error: {}", statechain_id_added.err().unwrap().to_string()); + if message_result.is_err() { + println!("Error: {}", message_result.err().unwrap().to_string()); continue; } - received_statechain_ids.push(statechain_id_added.unwrap()); + let message_result = message_result.unwrap(); + + if message_result.is_batch_locked { + is_there_batch_locked = true; + } + + if message_result.statechain_id.is_some() { + received_statechain_ids.push(message_result.statechain_id.unwrap()); + } } else { @@ -84,15 +99,24 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result< let mut new_coin = new_coin.unwrap(); - let statechain_id_added = process_encrypted_message(client_config, &mut new_coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await; + let message_result = process_encrypted_message(client_config, &mut new_coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await; - if statechain_id_added.is_err() { - println!("Error: {}", statechain_id_added.err().unwrap().to_string()); + if message_result.is_err() { + println!("Error: {}", message_result.err().unwrap().to_string()); continue; } temp_coins.push(new_coin); - received_statechain_ids.push(statechain_id_added.unwrap()); + + let message_result = message_result.unwrap(); + + if message_result.is_batch_locked { + is_there_batch_locked = true; + } + + if message_result.statechain_id.is_some() { + received_statechain_ids.push(message_result.statechain_id.unwrap()); + } } } } @@ -102,7 +126,10 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result< update_wallet(&client_config.pool, &wallet).await?; - Ok(received_statechain_ids) + Ok(TransferReceiveResult{ + is_there_batch_locked, + received_statechain_ids + }) } async fn get_msg_addr(auth_pubkey: &str, client_config: &ClientConfig) -> Result> { @@ -119,7 +146,12 @@ async fn get_msg_addr(auth_pubkey: &str, client_config: &ClientConfig) -> Result Ok(response.list_enc_transfer_msg) } -async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin, enc_message: &str, network: &str, info_config: &InfoConfig, activities: &mut Vec) -> Result { +pub struct MessageResult { + pub is_batch_locked: bool, + pub statechain_id: Option, +} + +async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin, enc_message: &str, network: &str, info_config: &InfoConfig, activities: &mut Vec) -> Result { let client_auth_key = coin.auth_privkey.clone(); let new_user_pubkey = coin.user_pubkey.clone(); @@ -200,7 +232,17 @@ async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin let transfer_receiver_result = send_transfer_receiver_request_payload(&client_config, &transfer_receiver_request_payload).await; let server_public_key_hex = match transfer_receiver_result { - Ok(server_public_key_hex) => server_public_key_hex, + std::result::Result::Ok(server_public_key_hex) => { + + if server_public_key_hex.is_batch_locked { + return Ok(MessageResult { + is_batch_locked: true, + statechain_id: None, + }); + } + + server_public_key_hex.server_pubkey.unwrap() + }, Err(err) => { return Err(anyhow::anyhow!("Error: {}", err.to_string())); } @@ -233,7 +275,12 @@ async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin insert_or_update_backup_txs(&client_config.pool, &transfer_msg.statechain_id, &transfer_msg.backup_transactions).await?; - Ok(transfer_msg.statechain_id.clone()) + Ok(MessageResult { + is_batch_locked: false, + statechain_id: Some(transfer_msg.statechain_id.clone()), + }) + + // Ok(transfer_msg.statechain_id.clone()) } async fn get_tx0(electrum_client: &electrum_client::Client, tx0_txid: &str) -> Result { @@ -304,14 +351,18 @@ async fn unlock_statecoin(client_config: &ClientConfig, statechain_id: &str, sig Ok(()) } -async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, transfer_receiver_request_payload: &mercurylib::transfer::receiver::TransferReceiverRequestPayload) -> Result{ +pub struct TransferReceiveRequestResult { + pub is_batch_locked: bool, + pub server_pubkey: Option, +} + +async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, transfer_receiver_request_payload: &mercurylib::transfer::receiver::TransferReceiverRequestPayload) -> Result{ let path = "transfer/receiver"; let client = client_config.get_reqwest_client()?; - loop { - let request = client.post(&format!("{}/{}", client_config.statechain_entity, path)); + let request: reqwest::RequestBuilder = client.post(&format!("{}/{}", client_config.statechain_entity, path)); let response = request.json(&transfer_receiver_request_payload).send().await?; @@ -328,18 +379,22 @@ async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, tr return Err(anyhow::anyhow!(error.message)); }, mercurylib::transfer::receiver::TransferReceiverError::StatecoinBatchLockedError => { - println!("Statecoin batch still locked. Waiting until expiration or unlock."); - thread::sleep(Duration::from_secs(5)); - continue; + return Ok(TransferReceiveRequestResult { + is_batch_locked: true, + server_pubkey: None, + }); }, } } if status == StatusCode::OK { let response: mercurylib::transfer::receiver::TransferReceiverPostResponsePayload = serde_json::from_str(value.as_str())?; - return Ok(response.server_pubkey); + return Ok(TransferReceiveRequestResult { + is_batch_locked: false, + server_pubkey: Some(response.server_pubkey) + }); } else { return Err(anyhow::anyhow!("{}: {}", "Failed to update transfer message".to_string(), value)); } - } + } \ No newline at end of file diff --git a/clients/tests/rust/Cargo.toml b/clients/tests/rust/Cargo.toml index 73e95d8a..99528b8f 100644 --- a/clients/tests/rust/Cargo.toml +++ b/clients/tests/rust/Cargo.toml @@ -12,3 +12,4 @@ tokio = { version = "1.27.0", features = ["full"] } serde = { version = "1.0.163", features = ["derive"] } serde_json = "1.0.96" electrum-client = "0.18.0" +uuid = { version = "1.3.1", features = ["v4", "serde"] } diff --git a/clients/tests/rust/src/main.rs b/clients/tests/rust/src/main.rs index 87d30ee4..487073d6 100644 --- a/clients/tests/rust/src/main.rs +++ b/clients/tests/rust/src/main.rs @@ -3,6 +3,7 @@ pub mod electrs; pub mod bitcoin_core; pub mod tb01_simple_transfer; pub mod tb02_transfer_address_reuse; +pub mod tb03_simple_atomic_transfer; pub mod tm01_sender_double_spends; pub mod ta01_sign_second_not_called; use anyhow::{Result, Ok}; @@ -12,6 +13,7 @@ async fn main() -> Result<()> { tb01_simple_transfer::execute().await?; tb02_transfer_address_reuse::execute().await?; + tb03_simple_atomic_transfer::execute().await?; tm01_sender_double_spends::execute().await?; ta01_sign_second_not_called::execute().await?; diff --git a/clients/tests/rust/src/ta01_sign_second_not_called.rs b/clients/tests/rust/src/ta01_sign_second_not_called.rs index 9567d3ea..69cff1c0 100644 --- a/clients/tests/rust/src/ta01_sign_second_not_called.rs +++ b/clients/tests/rust/src/ta01_sign_second_not_called.rs @@ -160,7 +160,8 @@ async fn ta01(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet) assert!(result.is_ok()); - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let received_statechain_ids = transfer_receive_result.received_statechain_ids; assert!(received_statechain_ids.contains(&statechain_id.to_string())); assert!(received_statechain_ids.len() == 1); diff --git a/clients/tests/rust/src/tb01_simple_transfer.rs b/clients/tests/rust/src/tb01_simple_transfer.rs index fec4c851..2b2dd852 100644 --- a/clients/tests/rust/src/tb01_simple_transfer.rs +++ b/clients/tests/rust/src/tb01_simple_transfer.rs @@ -128,7 +128,8 @@ async fn sucessfully_transfer(client_config: &ClientConfig, wallet1: &Wallet, wa assert!(new_coin.status == CoinStatus::IN_TRANSFER); - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let received_statechain_ids = transfer_receive_result.received_statechain_ids; assert!(received_statechain_ids.contains(&statechain_id.to_string())); assert!(received_statechain_ids.len() == 1); @@ -190,7 +191,7 @@ pub async fn execute() -> Result<()> { sucessfully_transfer(&client_config, &wallet1, &wallet2).await?; - println!("T01 - Transfer completed successfully"); + println!("TB01 - Transfer completed successfully"); Ok(()) } diff --git a/clients/tests/rust/src/tb02_transfer_address_reuse.rs b/clients/tests/rust/src/tb02_transfer_address_reuse.rs index 7ce817a3..0a87a04a 100644 --- a/clients/tests/rust/src/tb02_transfer_address_reuse.rs +++ b/clients/tests/rust/src/tb02_transfer_address_reuse.rs @@ -59,7 +59,8 @@ async fn tb02(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet) assert!(result.is_ok()); } - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?; + let received_statechain_ids = transfer_receive_result.received_statechain_ids; let wallet2: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet2.name).await?; @@ -111,7 +112,8 @@ async fn tb02(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet) assert!(result.is_ok()); - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet1.name).await?; + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet1.name).await?; + let received_statechain_ids = transfer_receive_result.received_statechain_ids; assert!(received_statechain_ids.contains(&statechain_id.to_string())); assert!(received_statechain_ids.len() == 1); @@ -162,7 +164,7 @@ pub async fn execute() -> Result<()> { tb02(&client_config, &wallet1, &wallet2).await?; - println!("T02 - Transfer Address Reuse completed successfully"); + println!("TB02 - Transfer Address Reuse completed successfully"); Ok(()) } diff --git a/clients/tests/rust/src/tb03_simple_atomic_transfer.rs b/clients/tests/rust/src/tb03_simple_atomic_transfer.rs new file mode 100644 index 00000000..81a18a56 --- /dev/null +++ b/clients/tests/rust/src/tb03_simple_atomic_transfer.rs @@ -0,0 +1,137 @@ +use std::{env, process::Command, thread, time::Duration}; +use anyhow::{Result, Ok}; +use mercuryrustlib::{client_config::ClientConfig, CoinStatus, Wallet}; + +use crate::{bitcoin_core, electrs}; + +pub async fn tb03(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet, wallet3: &Wallet, wallet4: &Wallet) -> Result<()> { + + let amount = 1000; + + // Create first deposit address + + let token_id = mercuryrustlib::deposit::get_token(client_config).await?; + + let wallet1_address = mercuryrustlib::deposit::get_deposit_bitcoin_address(&client_config, &wallet1.name, &token_id, amount).await?; + + let _ = bitcoin_core::sendtoaddress(amount, &wallet1_address)?; + + let token_id = mercuryrustlib::deposit::get_token(client_config).await?; + + let wallet2_address = mercuryrustlib::deposit::get_deposit_bitcoin_address(&client_config, &wallet2.name, &token_id, amount).await?; + + let _ = bitcoin_core::sendtoaddress(amount, &wallet2_address)?; + + let core_wallet_address = bitcoin_core::getnewaddress()?; + let remaining_blocks = client_config.confirmation_target; + let _ = bitcoin_core::generatetoaddress(remaining_blocks, &core_wallet_address)?; + + // It appears that Electrs takes a few seconds to index the transaction + let mut is_tx_indexed = false; + + while !is_tx_indexed { + let addr1_ok = electrs::check_address(client_config, &wallet1_address, amount).await?; + let addr2_ok = electrs::check_address(client_config, &wallet2_address, amount).await?; + is_tx_indexed = addr1_ok && addr2_ok; + thread::sleep(Duration::from_secs(1)); + } + + let batch_id = Some(uuid::Uuid::new_v4().to_string()); + + let wallet3_transfer_adress = mercuryrustlib::transfer_receiver::new_transfer_address(&client_config, &wallet3.name).await?; + let wallet4_transfer_adress = mercuryrustlib::transfer_receiver::new_transfer_address(&client_config, &wallet4.name).await?; + + mercuryrustlib::coin_status::update_coins(&client_config, &wallet1.name).await?; + let wallet1: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet1.name).await?; + let new_coin = wallet1.coins.iter().find(|&coin| coin.aggregated_address == Some(wallet1_address.clone()) && coin.status == CoinStatus::CONFIRMED).unwrap(); + let statechain_id_1 = new_coin.statechain_id.as_ref().unwrap(); + + let result = mercuryrustlib::transfer_sender::execute(&client_config, &wallet3_transfer_adress, &wallet1.name, &statechain_id_1, batch_id.clone()).await; + + assert!(result.is_ok()); + + mercuryrustlib::coin_status::update_coins(&client_config, &wallet2.name).await?; + let wallet2: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet2.name).await?; + let new_coin = wallet2.coins.iter().find(|&coin| coin.aggregated_address == Some(wallet2_address.clone()) && coin.status == CoinStatus::CONFIRMED).unwrap(); + let statechain_id_2 = new_coin.statechain_id.as_ref().unwrap(); + + let result = mercuryrustlib::transfer_sender::execute(&client_config, &wallet4_transfer_adress, &wallet2.name, &statechain_id_2, batch_id).await; + + assert!(result.is_ok()); + + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet3.name).await?; + + assert!(transfer_receive_result.is_there_batch_locked); + assert!(transfer_receive_result.received_statechain_ids.len() == 0); + + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet4.name).await?; + + assert!(!transfer_receive_result.is_there_batch_locked); + assert!(transfer_receive_result.received_statechain_ids.len() == 1); + assert!(transfer_receive_result.received_statechain_ids[0] == statechain_id_2.to_string()); + + let wallet4: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet4.name).await?; + let new_coin = wallet4.coins.iter().find(|&coin| coin.statechain_id == Some(statechain_id_2.clone())).unwrap(); + assert!(new_coin.status == CoinStatus::CONFIRMED); + + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet3.name).await?; + + assert!(!transfer_receive_result.is_there_batch_locked); + assert!(transfer_receive_result.received_statechain_ids.len() == 1); + assert!(transfer_receive_result.received_statechain_ids[0] == statechain_id_1.to_string()); + + let wallet3: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet3.name).await?; + let new_coin = wallet3.coins.iter().find(|&coin| coin.statechain_id == Some(statechain_id_1.clone())).unwrap(); + assert!(new_coin.status == CoinStatus::CONFIRMED); + + mercuryrustlib::coin_status::update_coins(&client_config, &wallet1.name).await?; + let wallet1: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet1.name).await?; + let new_coin = wallet1.coins.iter().find(|&coin| coin.aggregated_address == Some(wallet1_address.clone())).unwrap(); + assert!(new_coin.status == CoinStatus::TRANSFERRED); + + mercuryrustlib::coin_status::update_coins(&client_config, &wallet2.name).await?; + let wallet2: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet2.name).await?; + let new_coin = wallet2.coins.iter().find(|&coin| coin.aggregated_address == Some(wallet2_address.clone())).unwrap(); + assert!(new_coin.status == CoinStatus::TRANSFERRED); + + Ok(()) +} + +pub async fn execute() -> Result<()> { + + let _ = Command::new("rm").arg("wallet.db").arg("wallet.db-shm").arg("wallet.db-wal").output().expect("failed to execute process"); + + env::set_var("ML_NETWORK", "regtest"); + + let client_config = mercuryrustlib::client_config::load().await; + + let wallet1 = mercuryrustlib::wallet::create_wallet( + "wallet1", + &client_config).await?; + + mercuryrustlib::sqlite_manager::insert_wallet(&client_config.pool, &wallet1).await?; + + let wallet2 = mercuryrustlib::wallet::create_wallet( + "wallet2", + &client_config).await?; + + mercuryrustlib::sqlite_manager::insert_wallet(&client_config.pool, &wallet2).await?; + + let wallet3 = mercuryrustlib::wallet::create_wallet( + "wallet3", + &client_config).await?; + + mercuryrustlib::sqlite_manager::insert_wallet(&client_config.pool, &wallet3).await?; + + let wallet4 = mercuryrustlib::wallet::create_wallet( + "wallet4", + &client_config).await?; + + mercuryrustlib::sqlite_manager::insert_wallet(&client_config.pool, &wallet4).await?; + + tb03(&client_config, &wallet1, &wallet2, &wallet3, &wallet4).await?; + + println!("TB03- Simple Atomic Transfer Test completed successfully"); + + Ok(()) +} \ No newline at end of file diff --git a/clients/tests/rust/src/tm01_sender_double_spends.rs b/clients/tests/rust/src/tm01_sender_double_spends.rs index 54e95524..28c3d4b2 100644 --- a/clients/tests/rust/src/tm01_sender_double_spends.rs +++ b/clients/tests/rust/src/tm01_sender_double_spends.rs @@ -51,7 +51,8 @@ async fn tm01(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet, assert!(result.is_ok()); - let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet3.name).await?; + let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet3.name).await?; + let received_statechain_ids = transfer_receive_result.received_statechain_ids; assert!(received_statechain_ids.len() == 1);