Skip to content

Commit

Permalink
Refactor transfer-receive to make it non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
ssantos21 committed Jul 10, 2024
1 parent 339086f commit 1841829
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion clients/apps/rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{thread, time::Duration};

use anyhow::Result;
use clap::{Parser, Subcommand};
use serde_json::json;
Expand Down Expand Up @@ -162,7 +164,20 @@ async fn main() -> Result<()> {
},
Commands::TransferReceive { wallet_name } => {
mercuryrustlib::coin_status::update_coins(&client_config, &wallet_name).await?;
let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet_name).await?;

let mut received_statechain_ids = Vec::<String>::new();

loop {
let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet_name).await?;
received_statechain_ids.extend(transfer_receive_result.received_statechain_ids);

if transfer_receive_result.is_there_batch_locked {
println!("Statecoin batch still locked. Waiting until expiration or unlock.");
thread::sleep(Duration::from_secs(5));
} else {
break;
}
}

let obj = json!(received_statechain_ids);

Expand Down
101 changes: 78 additions & 23 deletions clients/libs/rust/src/transfer_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::{HashMap, HashSet}, str::FromStr, thread, time::Duration};
use std::{collections::{HashMap, HashSet}, str::FromStr};

use crate::{sqlite_manager::{get_wallet, update_wallet, insert_or_update_backup_txs}, client_config::ClientConfig, utils};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Ok, Result};
use bitcoin::{Txid, Address};
use chrono::Utc;
use electrum_client::ElectrumApi;
Expand All @@ -23,7 +23,12 @@ pub async fn new_transfer_address(client_config: &ClientConfig, wallet_name: &st
Ok(coin.address)
}

pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<Vec<String>>{
pub struct TransferReceiveResult {
pub is_there_batch_locked: bool,
pub received_statechain_ids: Vec<String>,
}

pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<TransferReceiveResult>{

let mut wallet = get_wallet(&client_config.pool, &wallet_name).await?;

Expand All @@ -47,6 +52,8 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<
enc_msgs_per_auth_pubkey.insert(auth_pubkey.clone(), enc_messages);
}

let mut is_there_batch_locked = false;

let mut received_statechain_ids = Vec::<String>::new();

let mut temp_coins = wallet.coins.clone();
Expand All @@ -64,14 +71,22 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<

let mut coin = coin.unwrap();

let statechain_id_added = process_encrypted_message(client_config, &mut coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await;
let message_result = process_encrypted_message(client_config, &mut coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await;

if statechain_id_added.is_err() {
println!("Error: {}", statechain_id_added.err().unwrap().to_string());
if message_result.is_err() {
println!("Error: {}", message_result.err().unwrap().to_string());
continue;
}

received_statechain_ids.push(statechain_id_added.unwrap());
let message_result = message_result.unwrap();

if message_result.is_batch_locked {
is_there_batch_locked = true;
}

if message_result.statechain_id.is_some() {
received_statechain_ids.push(message_result.statechain_id.unwrap());
}

} else {

Expand All @@ -84,15 +99,24 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<

let mut new_coin = new_coin.unwrap();

let statechain_id_added = process_encrypted_message(client_config, &mut new_coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await;
let message_result = process_encrypted_message(client_config, &mut new_coin, enc_message, &wallet.network, &info_config, &mut temp_activities).await;

if statechain_id_added.is_err() {
println!("Error: {}", statechain_id_added.err().unwrap().to_string());
if message_result.is_err() {
println!("Error: {}", message_result.err().unwrap().to_string());
continue;
}

temp_coins.push(new_coin);
received_statechain_ids.push(statechain_id_added.unwrap());

let message_result = message_result.unwrap();

if message_result.is_batch_locked {
is_there_batch_locked = true;
}

if message_result.statechain_id.is_some() {
received_statechain_ids.push(message_result.statechain_id.unwrap());
}
}
}
}
Expand All @@ -102,7 +126,10 @@ pub async fn execute(client_config: &ClientConfig, wallet_name: &str) -> Result<

update_wallet(&client_config.pool, &wallet).await?;

Ok(received_statechain_ids)
Ok(TransferReceiveResult{
is_there_batch_locked,
received_statechain_ids
})
}

async fn get_msg_addr(auth_pubkey: &str, client_config: &ClientConfig) -> Result<Vec<String>> {
Expand All @@ -119,7 +146,12 @@ async fn get_msg_addr(auth_pubkey: &str, client_config: &ClientConfig) -> Result
Ok(response.list_enc_transfer_msg)
}

async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin, enc_message: &str, network: &str, info_config: &InfoConfig, activities: &mut Vec<Activity>) -> Result<String> {
pub struct MessageResult {
pub is_batch_locked: bool,
pub statechain_id: Option<String>,
}

async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin, enc_message: &str, network: &str, info_config: &InfoConfig, activities: &mut Vec<Activity>) -> Result<MessageResult> {

let client_auth_key = coin.auth_privkey.clone();
let new_user_pubkey = coin.user_pubkey.clone();
Expand Down Expand Up @@ -200,7 +232,17 @@ async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin
let transfer_receiver_result = send_transfer_receiver_request_payload(&client_config, &transfer_receiver_request_payload).await;

let server_public_key_hex = match transfer_receiver_result {
Ok(server_public_key_hex) => server_public_key_hex,
std::result::Result::Ok(server_public_key_hex) => {

if server_public_key_hex.is_batch_locked {
return Ok(MessageResult {
is_batch_locked: true,
statechain_id: None,
});
}

server_public_key_hex.server_pubkey.unwrap()
},
Err(err) => {
return Err(anyhow::anyhow!("Error: {}", err.to_string()));
}
Expand Down Expand Up @@ -233,7 +275,12 @@ async fn process_encrypted_message(client_config: &ClientConfig, coin: &mut Coin

insert_or_update_backup_txs(&client_config.pool, &transfer_msg.statechain_id, &transfer_msg.backup_transactions).await?;

Ok(transfer_msg.statechain_id.clone())
Ok(MessageResult {
is_batch_locked: false,
statechain_id: Some(transfer_msg.statechain_id.clone()),
})

// Ok(transfer_msg.statechain_id.clone())
}

async fn get_tx0(electrum_client: &electrum_client::Client, tx0_txid: &str) -> Result<String> {
Expand Down Expand Up @@ -304,14 +351,18 @@ async fn unlock_statecoin(client_config: &ClientConfig, statechain_id: &str, sig
Ok(())
}

async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, transfer_receiver_request_payload: &mercurylib::transfer::receiver::TransferReceiverRequestPayload) -> Result<String>{
pub struct TransferReceiveRequestResult {
pub is_batch_locked: bool,
pub server_pubkey: Option<String>,
}

async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, transfer_receiver_request_payload: &mercurylib::transfer::receiver::TransferReceiverRequestPayload) -> Result<TransferReceiveRequestResult>{

let path = "transfer/receiver";

let client = client_config.get_reqwest_client()?;

loop {
let request = client.post(&format!("{}/{}", client_config.statechain_entity, path));
let request: reqwest::RequestBuilder = client.post(&format!("{}/{}", client_config.statechain_entity, path));

let response = request.json(&transfer_receiver_request_payload).send().await?;

Expand All @@ -328,18 +379,22 @@ async fn send_transfer_receiver_request_payload(client_config: &ClientConfig, tr
return Err(anyhow::anyhow!(error.message));
},
mercurylib::transfer::receiver::TransferReceiverError::StatecoinBatchLockedError => {
println!("Statecoin batch still locked. Waiting until expiration or unlock.");
thread::sleep(Duration::from_secs(5));
continue;
return Ok(TransferReceiveRequestResult {
is_batch_locked: true,
server_pubkey: None,
});
},
}
}

if status == StatusCode::OK {
let response: mercurylib::transfer::receiver::TransferReceiverPostResponsePayload = serde_json::from_str(value.as_str())?;
return Ok(response.server_pubkey);
return Ok(TransferReceiveRequestResult {
is_batch_locked: false,
server_pubkey: Some(response.server_pubkey)
});
} else {
return Err(anyhow::anyhow!("{}: {}", "Failed to update transfer message".to_string(), value));
}
}

}
1 change: 1 addition & 0 deletions clients/tests/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tokio = { version = "1.27.0", features = ["full"] }
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
electrum-client = "0.18.0"
uuid = { version = "1.3.1", features = ["v4", "serde"] }
2 changes: 2 additions & 0 deletions clients/tests/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod electrs;
pub mod bitcoin_core;
pub mod tb01_simple_transfer;
pub mod tb02_transfer_address_reuse;
pub mod tb03_simple_atomic_transfer;
pub mod tm01_sender_double_spends;
pub mod ta01_sign_second_not_called;
use anyhow::{Result, Ok};
Expand All @@ -12,6 +13,7 @@ async fn main() -> Result<()> {

tb01_simple_transfer::execute().await?;
tb02_transfer_address_reuse::execute().await?;
tb03_simple_atomic_transfer::execute().await?;
tm01_sender_double_spends::execute().await?;
ta01_sign_second_not_called::execute().await?;

Expand Down
3 changes: 2 additions & 1 deletion clients/tests/rust/src/ta01_sign_second_not_called.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ async fn ta01(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet)

assert!(result.is_ok());

let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let received_statechain_ids = transfer_receive_result.received_statechain_ids;

assert!(received_statechain_ids.contains(&statechain_id.to_string()));
assert!(received_statechain_ids.len() == 1);
Expand Down
5 changes: 3 additions & 2 deletions clients/tests/rust/src/tb01_simple_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ async fn sucessfully_transfer(client_config: &ClientConfig, wallet1: &Wallet, wa

assert!(new_coin.status == CoinStatus::IN_TRANSFER);

let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let received_statechain_ids = transfer_receive_result.received_statechain_ids;

assert!(received_statechain_ids.contains(&statechain_id.to_string()));
assert!(received_statechain_ids.len() == 1);
Expand Down Expand Up @@ -190,7 +191,7 @@ pub async fn execute() -> Result<()> {

sucessfully_transfer(&client_config, &wallet1, &wallet2).await?;

println!("T01 - Transfer completed successfully");
println!("TB01 - Transfer completed successfully");

Ok(())
}
8 changes: 5 additions & 3 deletions clients/tests/rust/src/tb02_transfer_address_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ async fn tb02(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet)
assert!(result.is_ok());
}

let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet2.name).await?;
let received_statechain_ids = transfer_receive_result.received_statechain_ids;

let wallet2: mercuryrustlib::Wallet = mercuryrustlib::sqlite_manager::get_wallet(&client_config.pool, &wallet2.name).await?;

Expand Down Expand Up @@ -111,7 +112,8 @@ async fn tb02(client_config: &ClientConfig, wallet1: &Wallet, wallet2: &Wallet)

assert!(result.is_ok());

let received_statechain_ids = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet1.name).await?;
let transfer_receive_result = mercuryrustlib::transfer_receiver::execute(&client_config, &wallet1.name).await?;
let received_statechain_ids = transfer_receive_result.received_statechain_ids;

assert!(received_statechain_ids.contains(&statechain_id.to_string()));
assert!(received_statechain_ids.len() == 1);
Expand Down Expand Up @@ -162,7 +164,7 @@ pub async fn execute() -> Result<()> {

tb02(&client_config, &wallet1, &wallet2).await?;

println!("T02 - Transfer Address Reuse completed successfully");
println!("TB02 - Transfer Address Reuse completed successfully");

Ok(())
}
Loading

0 comments on commit 1841829

Please sign in to comment.