diff --git a/Cargo.lock b/Cargo.lock index 374b129f..70358231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,6 +611,7 @@ dependencies = [ "proptest", "serde", "serde_json", + "thiserror", "tokio", "tracing", "tracing-subscriber", @@ -1154,6 +1155,26 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "thiserror" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.7" diff --git a/kairos-server/Cargo.toml b/kairos-server/Cargo.toml index fd75971d..0d24608b 100644 --- a/kairos-server/Cargo.toml +++ b/kairos-server/Cargo.toml @@ -12,6 +12,7 @@ name = "kairos-server" [dependencies] axum = { version = "0.7", features = ["tracing"]} +thiserror = "1.0" anyhow = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kairos-server/src/errors.rs b/kairos-server/src/errors.rs index 9378bfdd..0f8373ad 100644 --- a/kairos-server/src/errors.rs +++ b/kairos-server/src/errors.rs @@ -8,8 +8,6 @@ use axum::{ response::{IntoResponse, Response}, }; - - #[derive(Debug)] pub struct AppErr { error: anyhow::Error, diff --git a/kairos-server/src/lib.rs b/kairos-server/src/lib.rs index 8e192b2f..c1d6a675 100644 --- a/kairos-server/src/lib.rs +++ b/kairos-server/src/lib.rs @@ -1,36 +1,16 @@ pub mod errors; pub mod routes; - -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +pub mod state; use axum::{routing::post, Router}; -use routes::{transfer::Transfer, *}; -use tokio::sync::RwLock; +use routes::*; +use state::LockedBatchState; pub use errors::AppErr; type PublicKey = String; -pub struct BatchState { - pub balances: HashMap, - pub batch_epoch: u64, - /// The set of transfers that will be batched in the next epoch. - pub batched_transfers: HashSet, -} -impl BatchState { - pub fn new() -> Arc> { - Arc::new(RwLock::new(Self { - balances: HashMap::new(), - batch_epoch: 0, - batched_transfers: HashSet::new(), - })) - } -} - -pub fn app_router(state: Arc>) -> Router { +pub fn app_router(state: LockedBatchState) -> Router { Router::new() .route("/api/v1/mock/deposit", post(deposit)) .route("/api/v1/mock/withdraw", post(withdraw)) diff --git a/kairos-server/src/main.rs b/kairos-server/src/main.rs index afce7f8f..ad445758 100644 --- a/kairos-server/src/main.rs +++ b/kairos-server/src/main.rs @@ -1,5 +1,7 @@ use std::net::SocketAddr; +use kairos_server::state::BatchState; + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); @@ -11,7 +13,7 @@ async fn main() { }) }); - let app = kairos_server::app_router(kairos_server::BatchState::new()); + let app = kairos_server::app_router(BatchState::new()); let axum_addr = SocketAddr::from(([127, 0, 0, 1], axum_port)); tracing::info!("starting http server"); diff --git a/kairos-server/src/routes/deposit.rs b/kairos-server/src/routes/deposit.rs index c37cd133..dc59e765 100644 --- a/kairos-server/src/routes/deposit.rs +++ b/kairos-server/src/routes/deposit.rs @@ -1,20 +1,41 @@ -use std::sync::Arc; +use std::ops::Deref; -use axum::{extract::State, Json}; +use anyhow::anyhow; +use axum::{extract::State, http::StatusCode, Json}; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; -use crate::{AppErr, BatchState, PublicKey}; +use crate::{state::LockedBatchState, AppErr, PublicKey}; #[derive(Serialize, Deserialize)] -pub struct DepositRequest { +pub struct Deposit { pub public_key: PublicKey, pub amount: u64, } pub async fn deposit( - State(pool): State>>, - Json(proof_request): Json, + state: State, + Json(Deposit { public_key, amount }): Json, ) -> Result<(), AppErr> { - todo!("deposit") + tracing::info!("TODO: verifying deposit"); + + tracing::info!("TODO: adding deposit to batch"); + + let mut state = state.deref().write().await; + let account = state.balances.entry(public_key.clone()); + + let prior_balance = account.or_insert(0); + let updated_balance = prior_balance.checked_add(amount).ok_or_else(|| { + AppErr::set_status( + anyhow!("deposit would overflow account"), + StatusCode::CONFLICT, + ) + })?; + + tracing::info!( + "Updated account public_key={} balance={}", + public_key, + updated_balance + ); + + Ok(()) } diff --git a/kairos-server/src/routes/mod.rs b/kairos-server/src/routes/mod.rs index bd659de5..cec2d9ae 100644 --- a/kairos-server/src/routes/mod.rs +++ b/kairos-server/src/routes/mod.rs @@ -1,7 +1,7 @@ pub mod deposit; -pub mod withdraw; pub mod transfer; +pub mod withdraw; pub use deposit::deposit; -pub use withdraw::withdraw; pub use transfer::transfer; +pub use withdraw::withdraw; diff --git a/kairos-server/src/routes/transfer.rs b/kairos-server/src/routes/transfer.rs index bc546069..26419885 100644 --- a/kairos-server/src/routes/transfer.rs +++ b/kairos-server/src/routes/transfer.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; - -use axum::{extract::State, Json}; +use anyhow::anyhow; +use axum::{extract::State, http::StatusCode, Json}; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; -use crate::{AppErr, BatchState, PublicKey}; +use crate::{state::LockedBatchState, AppErr, PublicKey}; #[derive(Serialize, Deserialize)] pub struct Transfer { @@ -20,8 +18,91 @@ pub struct TransferRequest { } pub async fn transfer( - State(pool): State>>, - Json(proof_request): Json, + State(state): State, + Json(TransferRequest { + transfer, + signature: _, + }): Json, ) -> Result<(), AppErr> { - todo!() + if transfer.amount == 0 { + return Err(AppErr::set_status( + anyhow!("transfer amount must be greater than 0"), + StatusCode::BAD_REQUEST, + )); + } + + tracing::info!("TODO: verifying transfer signature"); + + // We pre-check this read-only to error early without acquiring the write lock. + // This prevents a DoS attack exploiting the write lock. + tracing::info!("verifying transfer sender has sufficient funds"); + check_sender_funds(&state, &transfer).await?; + + let mut state = state.write().await; + let from_balance = state.balances.get_mut(&transfer.from).ok_or_else(|| { + AppErr::set_status( + anyhow!( + "Sender no longer has an account. + The sender just removed all their funds." + ), + StatusCode::CONFLICT, + ) + })?; + + *from_balance = from_balance.checked_sub(transfer.amount).ok_or_else(|| { + AppErr::set_status( + anyhow!( + "Sender no longer has sufficient funds, balance={}, transfer_amount={}. + The sender just moved their funds in a concurrent request", + from_balance, + transfer.amount + ), + StatusCode::CONFLICT, + ) + })?; + + let to_balance = state + .balances + .entry(transfer.to.clone()) + .or_insert_with(|| { + tracing::info!("creating new account for receiver"); + 0 + }); + + *to_balance = to_balance.checked_add(transfer.amount).ok_or_else(|| { + AppErr::set_status(anyhow!("Receiver balance overflow"), StatusCode::CONFLICT) + })?; + + Ok(()) +} + +async fn check_sender_funds(state: &LockedBatchState, transfer: &Transfer) -> Result<(), AppErr> { + let state = state.read().await; + let from_balance = state.balances.get(&transfer.from).ok_or_else(|| { + AppErr::set_status( + anyhow!("Sender does not have an account"), + StatusCode::BAD_REQUEST, + ) + })?; + + from_balance.checked_sub(transfer.amount).ok_or_else(|| { + AppErr::set_status( + anyhow!( + "Sender does not have sufficient funds, balance={}, transfer_amount={}", + from_balance, + transfer.amount + ), + StatusCode::FORBIDDEN, + ) + })?; + + let to_balance = state.balances.get(&transfer.to).unwrap_or(&0); + if to_balance.checked_add(transfer.amount).is_none() { + return Err(AppErr::set_status( + anyhow!("Receiver balance overflow"), + StatusCode::CONFLICT, + )); + } + + Ok(()) } diff --git a/kairos-server/src/routes/withdraw.rs b/kairos-server/src/routes/withdraw.rs index 8cc090fd..e1b1e008 100644 --- a/kairos-server/src/routes/withdraw.rs +++ b/kairos-server/src/routes/withdraw.rs @@ -1,20 +1,87 @@ -use std::sync::Arc; - -use axum::{extract::State, Json}; +use anyhow::anyhow; +use axum::{extract::State, http::StatusCode, Json}; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; -use crate::{AppErr, BatchState, PublicKey}; +use crate::{state::LockedBatchState, AppErr, PublicKey}; #[derive(Serialize, Deserialize)] -pub struct WithdrawRequest { +pub struct Withdrawal { pub public_key: PublicKey, + pub signature: String, pub amount: u64, } pub async fn withdraw( - State(pool): State>>, - Json(proof_request): Json, + State(state): State, + Json(withdrawal): Json, +) -> Result<(), AppErr> { + tracing::info!("TODO: verifying withdrawal signature"); + + tracing::info!("verifying withdrawal sender has sufficient funds"); + check_sender_funds(&state, &withdrawal).await?; + + tracing::info!("TODO: adding withdrawal to batch"); + + let mut state = state.write().await; + let from_balance = state + .balances + .get_mut(&withdrawal.public_key) + .ok_or_else(|| { + AppErr::set_status( + anyhow!( + "Sender no longer has an account. + The sender just removed all their funds." + ), + StatusCode::CONFLICT, + ) + })?; + + let updated_balance = from_balance.checked_sub(withdrawal.amount).ok_or_else(|| { + AppErr::set_status( + anyhow!( + "Sender no longer has sufficient funds, balance={}, withdrawal_amount={}. + The sender just moved their funds in a concurrent request", + from_balance, + withdrawal.amount + ), + StatusCode::CONFLICT, + ) + })?; + + *from_balance = updated_balance; + + if updated_balance == 0 { + state.balances.remove(&withdrawal.public_key); + } + + tracing::info!( + "Updated account public_key={} balance={}", + withdrawal.public_key, + updated_balance + ); + + Ok(()) +} + +async fn check_sender_funds( + state: &LockedBatchState, + withdrawal: &Withdrawal, ) -> Result<(), AppErr> { - todo!() + let state = state.read().await; + let from_balance = state.balances.get(&withdrawal.public_key).ok_or_else(|| { + AppErr::set_status(anyhow!("Withdrawer has no account."), StatusCode::CONFLICT) + })?; + + if *from_balance < withdrawal.amount { + return Err(AppErr::set_status( + anyhow!( + "Withdrawer has insufficient funds, balance={}, withdrawal_amount={}.", + from_balance, + withdrawal.amount + ), + StatusCode::FORBIDDEN, + )); + } + + Ok(()) } diff --git a/kairos-server/src/state.rs b/kairos-server/src/state.rs new file mode 100644 index 00000000..d103ab99 --- /dev/null +++ b/kairos-server/src/state.rs @@ -0,0 +1,33 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use tokio::sync::RwLock; + +use crate::{ + routes::{deposit::Deposit, transfer::Transfer, withdraw::Withdrawal}, + PublicKey, +}; + +pub type LockedBatchState = Arc>; + +pub struct BatchState { + pub balances: HashMap, + pub batch_epoch: u64, + /// The set of transfers that will be batched in the next epoch. + pub batched_transfers: HashSet, + pub batched_deposits: Vec, + pub batched_withdrawals: Vec, +} +impl BatchState { + pub fn new() -> Arc> { + Arc::new(RwLock::new(Self { + balances: HashMap::new(), + batch_epoch: 0, + batched_transfers: HashSet::new(), + batched_deposits: Vec::new(), + batched_withdrawals: Vec::new(), + })) + } +}