Skip to content

Commit

Permalink
Merge branch 'main' into feature/contract-submit-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Avi-D-coder authored Jun 21, 2024
2 parents 5c5e58e + 456cae1 commit 870f849
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 39 deletions.
8 changes: 8 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893"
KAIROS_PROVER_SERVER_SOCKET_ADDR="127.0.0.1:7894"
# In development the socket and url should match
KAIROS_PROVER_SERVER_URL="http://127.0.0.1:7894"
KAIROS_SERVER_MAX_BATCH_SIZE=2
KAIROS_SERVER_MAX_BATCH_SECONDS=60

RISC0_DEV_MODE=1;
# FIXME this is a dummy value that fixes a regression that prevented server startup
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kairos-cli/tests/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn deposit_successful_with_ed25519() {
.expect("Expected at least one node after successful network run");
let node_url = Url::parse(&format!("http://localhost:{}/rpc", node.port.rpc_port)).unwrap();

let kairos = kairos::Kairos::run(node_url).await.unwrap();
let kairos = kairos::Kairos::run(node_url, None).await.unwrap();

tokio::task::spawn_blocking(move || {
let depositor_secret_key_path = network
Expand Down
2 changes: 1 addition & 1 deletion kairos-prover/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@

kairos-prover = craneLib.buildPackage (kairosProverAttrs // {
cargoArtifacts = self'.packages.kairos-prover-deps;
meta.mainProgram = "kairos-prover";
meta.mainProgram = "kairos-prover-risc0-server";
});
};
};
Expand Down
12 changes: 8 additions & 4 deletions kairos-prover/kairos-prover-risc0-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,25 @@ use kairos_circuit_logic::{ProofInputs, ProofOutputs};
use methods::PROVE_BATCH_ELF;
use risc0_zkvm::{ExecutorEnv, Prover, Receipt};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{prelude::*, EnvFilter};

#[tokio::main]
async fn main() {
let _ = dotenvy::dotenv();
// Initialize tracing. In order to view logs, run `RUST_LOG=info cargo run`
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
tracing_subscriber::registry()
.with(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,kairos_prover_risc0_server=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

let socket_addr = std::env::var("KAIROS_PROVER_SERVER_SOCKET_ADDR")
.expect("Failed to fetch environment variable KAIROS_PROVER_SERVER_SOCKET_ADDR");
let socket_addr = socket_addr
.parse::<std::net::SocketAddr>()
.expect("Failed to parse KAIROS_SERVER_SOCKET_ADDR");
.expect("Failed to parse KAIROS_PROVER_SERVER_SOCKET_ADDR");

let app = axum::Router::new()
.typed_post(prove_batch_route)
Expand Down
2 changes: 1 addition & 1 deletion kairos-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ contract-utils = { path = "../kairos-contracts/demo-contract/contract-utils" }
kairos-circuit-logic = { path = "../kairos-prover/kairos-circuit-logic", features = ["serde", "asn1"] }
kairos-trie = { git = "https://github.com/cspr-rad/kairos-trie" }
sha2 = "0.10"
reqwest = "0.12"
reqwest = { version = "0.12", features = ["json"] }
casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.3" }
thiserror = "1.0"

Expand Down
45 changes: 45 additions & 0 deletions kairos-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,54 @@
use reqwest::Url;
use std::net::SocketAddr;
use std::time::Duration;
use std::{fmt, str::FromStr};

#[derive(Clone, Debug)]
pub struct ServerConfig {
pub socket_addr: SocketAddr,
pub casper_rpc: Url,
pub casper_contract_hash: String,
pub batch_config: BatchConfig,
}

impl ServerConfig {
pub fn from_env() -> Result<Self, String> {
let socket_addr = parse_env_as::<SocketAddr>("KAIROS_SERVER_SOCKET_ADDR")?;
let casper_rpc = parse_env_as::<Url>("KAIROS_SERVER_CASPER_RPC")?;
let batch_config = BatchConfig::from_env()?;
let casper_contract_hash = parse_env_as::<String>("KAIROS_SERVER_CASPER_CONTRACT_HASH")?;

Ok(Self {
socket_addr,
casper_rpc,
casper_contract_hash,
batch_config,
})
}
}

/// Configuration for the trie state thread.
/// Currently only configures when a batch is committed and sent to the proving server.
#[derive(Debug, Clone)]
pub struct BatchConfig {
/// Set by the environment variable `KAIROS_SERVER_MAX_BATCH_SIZE`.
pub max_batch_size: Option<u64>,
/// Set by the environment variable `KAIROS_SERVER_MAX_BATCH_SECONDS`.
pub max_batch_duration: Option<Duration>,
pub proving_server: Url,
}

impl BatchConfig {
pub fn from_env() -> Result<Self, String> {
let max_batch_size = parse_env_as_opt("KAIROS_SERVER_MAX_BATCH_SIZE")?;
let max_batch_duration =
parse_env_as_opt::<u64>("KAIROS_SERVER_MAX_BATCH_SECONDS")?.map(Duration::from_secs);
let proving_server = parse_env_as::<Url>("KAIROS_PROVER_SERVER_URL")?;

Ok(Self {
max_batch_size,
max_batch_duration,
proving_server,
})
}
}
Expand All @@ -34,3 +65,17 @@ where
.map_err(|e| format!("Failed to parse {}: {}", env, e))
})
}

fn parse_env_as_opt<T>(env: &str) -> Result<Option<T>, String>
where
T: FromStr,
<T as FromStr>::Err: fmt::Display,
{
match std::env::var(env) {
Ok(val) => val
.parse::<T>()
.map(Some)
.map_err(|e| format!("Failed to parse {}: {}", env, e)),
Err(_) => Ok(None),
}
}
2 changes: 1 addition & 1 deletion kairos-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn run(config: ServerConfig) {
tracing::info!("listening on `{}`", listener.local_addr().unwrap());

let state = Arc::new(ServerStateInner {
batch_state_manager: BatchStateManager::new_empty(),
batch_state_manager: BatchStateManager::new_empty(config.batch_config.clone()),
server_config: config.clone(),
});

Expand Down
46 changes: 37 additions & 9 deletions kairos-server/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
pub mod transactions;
mod trie;

use std::{sync::Arc, thread::JoinHandle};
use std::{sync::Arc, thread};

use tokio::sync::mpsc;
use tokio::{sync::mpsc, task};

pub use self::trie::TrieStateThreadMsg;
use crate::config::ServerConfig;
use crate::config::{BatchConfig, ServerConfig};
use kairos_circuit_logic::transactions::KairosTransaction;
use kairos_trie::{stored::memory_db::MemoryDb, NodeHash, TrieRoot};

Expand All @@ -26,28 +26,56 @@ pub struct ServerStateInner {
/// via a oneshot channel in each `TrieStateThreadMsg`.
#[derive(Debug)]
pub struct BatchStateManager {
pub trie_thread: JoinHandle<()>,
pub trie_thread: thread::JoinHandle<()>,
pub batch_output_handler: task::JoinHandle<()>,
pub queued_transactions: mpsc::Sender<TrieStateThreadMsg>,
}

impl BatchStateManager {
/// Create a new `BatchStateManager` with the given `db` and `batch_root`.
/// `batch_root` and it's descendants must be in the `db`.
/// This method spawns the trie state thread, it should be called only once.
pub fn new(db: trie::Database, batch_root: TrieRoot<NodeHash>) -> Self {
let (queued_transactions, receiver) = mpsc::channel(1000);
let trie_thread = trie::spawn_state_thread(receiver, db, batch_root);
pub fn new(config: BatchConfig, db: trie::Database, batch_root: TrieRoot<NodeHash>) -> Self {
let (queued_transactions, txn_receiver) = mpsc::channel(1000);
// This queue provides back pressure to the trie thread.
let (batch_sender, mut batch_rec) = mpsc::channel(10);
let trie_thread =
trie::spawn_state_thread(config.clone(), txn_receiver, batch_sender, db, batch_root);

let batch_output_handler = tokio::spawn(async move {
while let Some(batch_output) = batch_rec.recv().await {
let prove_url = config.proving_server.join("prove").expect("Invalid URL");

let res = reqwest::Client::new()
.post(prove_url)
.json(&batch_output.proof_inputs)
.send()
.await
.unwrap_or_else(|e| {
tracing::error!("Could not send batch output to proving server: {}", e);
panic!("Could not send batch output to proving server: {}", e);
});

if res.status().is_success() {
// TODO send the proof to layer 1
} else {
tracing::error!("Proving server returned an error: {:?}", res);
panic!("Proving server returned an error: {:?}", res);
}
}
});

Self {
trie_thread,
batch_output_handler,
queued_transactions,
}
}

/// Create a new `BatchStateManager` with an empty `MemoryDb` and an empty `TrieRoot`.
/// This is useful for testing.
pub fn new_empty() -> Self {
Self::new(MemoryDb::empty(), TrieRoot::default())
pub fn new_empty(config: BatchConfig) -> Self {
Self::new(config, MemoryDb::empty(), TrieRoot::default())
}

pub async fn enqueue_transaction(&self, txn: KairosTransaction) -> Result<(), crate::AppErr> {
Expand Down
38 changes: 36 additions & 2 deletions kairos-server/src/state/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::{
mem,
rc::Rc,
thread::{self, JoinHandle},
time::Instant,
};

use sha2::Sha256;
use tokio::sync::{mpsc, oneshot};

use super::transactions::batch_state::BatchState;
use crate::AppErr;
use crate::{config::BatchConfig, AppErr};
use kairos_circuit_logic::{
account_trie::{Account, AccountTrie},
transactions::KairosTransaction,
Expand Down Expand Up @@ -40,12 +41,15 @@ impl TrieStateThreadMsg {
}

pub fn spawn_state_thread(
config: BatchConfig,
mut queue: mpsc::Receiver<TrieStateThreadMsg>,
batch_outputs_receiver: mpsc::Sender<BatchOutput>,
db: Database,
batch_root: TrieRoot<NodeHash>,
) -> JoinHandle<()> {
thread::spawn(move || {
let mut state = TrieState::new(db, batch_root);
let mut last_commit_time = Instant::now();

while let Some(msg) = queue.blocking_recv() {
tracing::trace!("Trie State Thread received message: {:?}", msg);
Expand All @@ -58,7 +62,35 @@ pub fn spawn_state_thread(
err.map(|()| "Success".to_string())
.unwrap_or_else(|err| err.to_string())
)
})
});

let should_commit = match config {
BatchConfig {
max_batch_size: Some(batch_size),
..
} if state.batch_state.batched_txns.len() as u64 >= batch_size => true,
BatchConfig {
max_batch_duration: Some(duration),
..
} if last_commit_time.elapsed() >= duration => true,
_ => false,
};

if should_commit {
let batch_output = state.commit_and_start_new_txn().unwrap_or_else(|err| {
tracing::error!("Failed to commit trie state: {:?}", err);
panic!("Failed to commit trie state: {:?}", err);
});

batch_outputs_receiver
.blocking_send(batch_output)
.unwrap_or_else(|err| {
tracing::error!("Failed to send batch output: {:?}", err);
panic!("Failed to send batch output: {:?}", err);
});

last_commit_time = Instant::now();
}
}
TrieStateThreadMsg::Commit(sender) => {
let res = state.commit_and_start_new_txn();
Expand Down Expand Up @@ -104,6 +136,8 @@ impl TrieState {
}

/// Calculate the new root hash of the trie and sync changes to the database.
///
/// Errors if underlying trie commit fails due to data database connection or consistency issues.
pub fn commit_and_start_new_txn(&mut self) -> Result<BatchOutput, AppErr> {
let old_trie_txn = &self.batch_state.account_trie;
let old_root = self.batch_root;
Expand Down
24 changes: 16 additions & 8 deletions kairos-server/tests/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use casper_client_types::{
AsymmetricType,
};
use kairos_server::{
config::ServerConfig,
config::{BatchConfig, ServerConfig},
routes::deposit::DepositPath,
state::{BatchStateManager, ServerStateInner},
};
Expand Down Expand Up @@ -44,14 +44,22 @@ fn new_test_app_with_casper_node(casper_node_url: &Url) -> TestServer {
.init();
});
let config = TestServerConfig::builder().mock_transport().build();
let state = Arc::new(ServerStateInner {
batch_state_manager: BatchStateManager::new_empty(),
server_config: ServerConfig {
socket_addr: "0.0.0.0:0".parse().unwrap(),
casper_rpc: casper_node_url.clone(),
casper_contract_hash:
"0000000000000000000000000000000000000000000000000000000000000000".to_string(),
let server_config = ServerConfig {
socket_addr: "0.0.0.0:0".parse().unwrap(),
casper_rpc: casper_node_url.clone(),
casper_contract_hash: "0000000000000000000000000000000000000000000000000000000000000000"
.to_string(),
batch_config: BatchConfig {
max_batch_size: None,
max_batch_duration: None,
// dummy proving server will never be called because of max_batch_size and max_batch_duration
proving_server: Url::parse("http://127.0.0.1:7894").unwrap(),
},
};

let state = Arc::new(ServerStateInner {
batch_state_manager: BatchStateManager::new_empty(server_config.batch_config.clone()),
server_config,
});

TestServer::new_with_config(kairos_server::app_router(state), config).unwrap()
Expand Down
2 changes: 1 addition & 1 deletion kairos-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
reqwest = { version = "0.12", features = ["json"] }
kairos-server = { path = "../kairos-server" }

dotenvy = "0.15"
Loading

0 comments on commit 870f849

Please sign in to comment.