diff --git a/Cargo.lock b/Cargo.lock index 3c42df5a..48a5c3a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,6 +531,45 @@ dependencies = [ "toml", ] +[[package]] +name = "casper-event-standard" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3a1bdb142b4bfcdceec757422b2e292f446b72ce3613f881eb694f3925ef10" +dependencies = [ + "casper-contract", + "casper-event-standard-macro", + "casper-types 4.0.1", +] + +[[package]] +name = "casper-event-standard-macro" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "485810e6c8387863a92e9b81e4e66ce290e2c96c0ad8ec4352e95128aa88900e" +dependencies = [ + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 1.0.109", +] + +[[package]] +name = "casper-event-toolkit" +version = "0.1.3" +source = "git+https://github.com/koxu1996/casper-event-toolkit.git#360dbe1db29fd4575fadf06dbcb7b7c9ddc741ca" +dependencies = [ + "bincode", + "casper-client", + "casper-event-standard", + "casper-hashing 2.0.0", + "casper-hashing 3.0.0", + "casper-types 3.0.0", + "hex", + "serde", + "thiserror", + "tokio", +] + [[package]] name = "casper-execution-engine" version = "7.0.1" @@ -849,6 +888,14 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6051f239ecec86fde3410901ab7860d458d160371533842974fc61f96d15879b" +[[package]] +name = "contract-utils" +version = "0.1.0" +dependencies = [ + "casper-event-standard", + "casper-types 3.0.0", +] + [[package]] name = "cookie" version = "0.18.1" @@ -2103,7 +2150,9 @@ dependencies = [ "axum-extra", "axum-test", "casper-client", + "casper-event-toolkit", "casper-types 3.0.0", + "contract-utils", "dotenvy", "hex", "kairos-circuit-logic", @@ -2116,6 +2165,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", + "thiserror", "tokio", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index b6cfe2c5..3ffddec5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "kairos-tx", "kairos-prover/kairos-circuit-logic", "demo-contract-tests", + "kairos-contracts/demo-contract/contract-utils" ] [workspace.package] diff --git a/flake.nix b/flake.nix index 6b756c40..bfa96274 100644 --- a/flake.nix +++ b/flake.nix @@ -83,6 +83,7 @@ ./kairos-test-utils ./kairos-tx ./kairos-prover/kairos-circuit-logic + ./kairos-contracts/demo-contract/contract-utils ]; }; @@ -110,7 +111,8 @@ RUST_SRC_PATH = "${rustToolchain}/lib/rustlib/src/rust/library"; CASPER_CHAIN_NAME = "cspr-dev-cctl"; PATH_TO_WASM_BINARIES = "${self'.packages.kairos-contracts}/bin"; - inputsFrom = [ self'.packages.kairos ]; + CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_LINKER = "lld"; + inputsFrom = [ self'.packages.kairos self'.packages.kairos-contracts ]; }; packages = { diff --git a/kairos-server/.env b/kairos-server/.env index 7d84c880..9c37acae 100644 --- a/kairos-server/.env +++ b/kairos-server/.env @@ -1,2 +1,3 @@ KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893" KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc" +KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000" diff --git a/kairos-server/Cargo.toml b/kairos-server/Cargo.toml index d1355c7c..2de851b5 100644 --- a/kairos-server/Cargo.toml +++ b/kairos-server/Cargo.toml @@ -35,10 +35,13 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] } hex = "0.4" kairos-tx = { path = "../kairos-tx" } +contract-utils = { path = "../kairos-contracts/demo-contract/contract-utils" } kairos-circuit-logic = { path = "../kairos-prover/kairos-circuit-logic", features = ["serde", "asn1"] } kairos-trie = { git = "https://github.com/cspr-rad/kairos-trie" } sha2 = "0.10" reqwest = "0.12" +casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.3" } +thiserror = "1.0" [dev-dependencies] proptest = "1" diff --git a/kairos-server/src/config.rs b/kairos-server/src/config.rs index 06da0096..61999cb3 100644 --- a/kairos-server/src/config.rs +++ b/kairos-server/src/config.rs @@ -6,15 +6,18 @@ use std::{fmt, str::FromStr}; pub struct ServerConfig { pub socket_addr: SocketAddr, pub casper_rpc: Url, + pub casper_contract_hash: String, } impl ServerConfig { pub fn from_env() -> Result { let socket_addr = parse_env_as::("KAIROS_SERVER_SOCKET_ADDR")?; let casper_rpc = parse_env_as::("KAIROS_SERVER_CASPER_RPC")?; + let casper_contract_hash = parse_env_as::("KAIROS_SERVER_CASPER_CONTRACT_HASH")?; Ok(Self { socket_addr, casper_rpc, + casper_contract_hash, }) } } diff --git a/kairos-server/src/l1_sync/error.rs b/kairos-server/src/l1_sync/error.rs new file mode 100644 index 00000000..11fac6f6 --- /dev/null +++ b/kairos-server/src/l1_sync/error.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum L1SyncError { + /// Casper Event Toolkit error. + #[error("toolkit error: {error}")] + ToolkitError { + #[from] + error: casper_event_toolkit::error::ToolkitError, + }, + + /// Communication error. + #[error("channel error: {0}")] + BrokenChannel(String), + + /// Error that we cannot recover from. + #[error("Unexpected error: {0}")] + UnexpectedError(String), +} diff --git a/kairos-server/src/l1_sync/event_manager.rs b/kairos-server/src/l1_sync/event_manager.rs new file mode 100644 index 00000000..17a3b27f --- /dev/null +++ b/kairos-server/src/l1_sync/event_manager.rs @@ -0,0 +1,89 @@ +use std::sync::Arc; + +use casper_event_toolkit::casper_types::bytesrepr::FromBytes; +use casper_event_toolkit::fetcher::{Fetcher, Schemas}; +use casper_event_toolkit::metadata::CesMetadataRef; +use casper_event_toolkit::rpc::client::CasperClient; +use contract_utils::Deposit; + +use crate::state::ServerStateInner; +use kairos_circuit_logic::transactions::{KairosTransaction, L1Deposit}; + +use super::error::L1SyncError; + +pub struct EventManager { + next_event_id: u32, + fetcher: Fetcher, + schemas: Schemas, + server_state: Arc, +} + +impl EventManager { + pub async fn new(server_state: Arc) -> Result { + tracing::info!("Initializing event manager"); + + let rpc_url = server_state.server_config.casper_rpc.as_str(); + let contract_hash = server_state.server_config.casper_contract_hash.as_str(); + let client = CasperClient::new(rpc_url); + let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?; + tracing::debug!("Metadata fetched successfully"); + + let fetcher = Fetcher { + client, + ces_metadata: metadata, + }; + let schemas = fetcher.fetch_schema().await?; + tracing::debug!("Schemas fetched successfully"); + + Ok(EventManager { + next_event_id: 0, + fetcher, + schemas, + server_state, + }) + } + + /// Processes new events starting from the last known event ID. + pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> { + tracing::info!("Looking for new events"); + + let num_events = self.fetcher.fetch_events_count().await?; + for i in self.next_event_id..num_events { + let event = self.fetcher.fetch_event(i, &self.schemas).await?; + tracing::debug!("Event {} fetched: {:?}.", i, event); + + let event_bytes = event.to_ces_bytes()?; + + // (koxu1996) NOTE: I think we should rather use full transaction data (ASN) for events, + // parse them here with `kairos-tx` and then push to Data Availability layer. + + match event.name.as_str() { + "Deposit" => { + // Parse simplified deposit data. + let (deposit, _) = Deposit::from_bytes(&event_bytes) + .expect("Failed to parse deposit event from bytes"); + + let amount = deposit.amount; + let recipient: Vec = "cafebabe".into(); // CAUTION: Using mocked recipient, as event does NOT contain depositor's public key. + let txn = KairosTransaction::Deposit(L1Deposit { amount, recipient }); + + // Push deposit to trie. + self.server_state + .batch_state_manager + .enqueue_transaction(txn) + .await + .map_err(|e| { + L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)) + })?; + } + name => { + tracing::error!("Unrecognized event {}", name); + } + } + + self.next_event_id = i + 1; + } + + Ok(()) + } +} diff --git a/kairos-server/src/l1_sync/interval_trigger.rs b/kairos-server/src/l1_sync/interval_trigger.rs new file mode 100644 index 00000000..c310450c --- /dev/null +++ b/kairos-server/src/l1_sync/interval_trigger.rs @@ -0,0 +1,18 @@ +use tokio::time::{self, Duration}; + +use std::sync::Arc; + +use super::service::L1SyncService; + +pub async fn run(sync_service: Arc) { + let mut interval = time::interval(Duration::from_secs(30)); + + loop { + interval.tick().await; + + tracing::debug!("Triggering periodic L1 sync"); + let _ = sync_service.trigger_sync().await.map_err(|e| { + tracing::error!("Unable to trigger sync: {}", e); + }); + } +} diff --git a/kairos-server/src/l1_sync/mod.rs b/kairos-server/src/l1_sync/mod.rs new file mode 100644 index 00000000..73076152 --- /dev/null +++ b/kairos-server/src/l1_sync/mod.rs @@ -0,0 +1,5 @@ +pub mod error; +pub mod event_manager; +pub mod service; + +pub mod interval_trigger; diff --git a/kairos-server/src/l1_sync/service.rs b/kairos-server/src/l1_sync/service.rs new file mode 100644 index 00000000..16a4fd48 --- /dev/null +++ b/kairos-server/src/l1_sync/service.rs @@ -0,0 +1,77 @@ +use crate::state::ServerStateInner; + +use super::error::L1SyncError; +use super::event_manager::EventManager; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use std::sync::Arc; + +pub enum SyncCommand { + TriggerSync(oneshot::Sender<()>), + // NOTE: More commands can be here. +} + +pub struct L1SyncService { + command_sender: mpsc::Sender, + //event_manager_handle: tokio::task::JoinHandle<()>, +} + +impl L1SyncService { + pub async fn new(server_state: Arc) -> Result { + let event_manager = EventManager::new(server_state.clone()).await?; + + let (tx, rx) = mpsc::channel(32); + let _handle = tokio::spawn(async move { + run_event_manager(rx, event_manager).await; + }); + + Ok(L1SyncService { + command_sender: tx, + //event_manager_handle: _handle, + }) + } + + pub async fn trigger_sync(&self) -> Result<(), L1SyncError> { + let (tx, rx) = oneshot::channel(); + self.command_sender + .send(SyncCommand::TriggerSync(tx)) + .await + .map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?; + rx.await.map_err(|e| { + L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e)) + })?; + + Ok(()) + } +} + +/// Handles incoming commands and delegates tasks to EventManager. +async fn run_event_manager(mut rx: mpsc::Receiver, mut event_manager: EventManager) { + tracing::debug!("Event manager running and waiting for commands"); + while let Some(command) = rx.recv().await { + let _ = handle_command(command, &mut event_manager) + .await + .map_err(|e| match e { + L1SyncError::UnexpectedError(e) => panic!("Unrecoverable error: {}", e), + _ => tracing::error!("Transient error: {}", e), + }); + } +} + +async fn handle_command( + command: SyncCommand, + event_manager: &mut EventManager, +) -> Result<(), L1SyncError> { + match command { + SyncCommand::TriggerSync(completion_ack) => { + event_manager.process_new_events().await?; + completion_ack + .send(()) + .map_err(|_| L1SyncError::BrokenChannel("Sender dropped".to_string()))?; + } + } + + Ok(()) +} diff --git a/kairos-server/src/lib.rs b/kairos-server/src/lib.rs index 0b82c750..deb3943d 100644 --- a/kairos-server/src/lib.rs +++ b/kairos-server/src/lib.rs @@ -3,6 +3,7 @@ pub mod errors; pub mod routes; pub mod state; +mod l1_sync; mod utils; use std::sync::Arc; @@ -13,6 +14,7 @@ use axum_extra::routing::RouterExt; pub use errors::AppErr; use crate::config::ServerConfig; +use crate::l1_sync::service::L1SyncService; use crate::state::{BatchStateManager, ServerState, ServerStateInner}; /// TODO: support secp256k1 @@ -38,6 +40,28 @@ pub fn app_router(state: ServerState) -> Router { .with_state(state) } +pub async fn run_l1_sync(server_state: Arc) { + // Extra check: make sure the default dummy value of contract hash was changed. + let contract_hash = server_state.server_config.casper_contract_hash.as_str(); + if contract_hash == "0000000000000000000000000000000000000000000000000000000000000000" { + tracing::warn!( + "Casper contract hash not configured, L1 synchronization will NOT be enabled." + ); + return; + } + + // Initialize L1 synchronizer. + let l1_sync_service = L1SyncService::new(server_state).await.unwrap_or_else(|e| { + panic!("Event manager failed to initialize: {}", e); + }); + + // Run periodic synchronization. + // TODO: Add additional SSE trigger. + tokio::spawn(async move { + l1_sync::interval_trigger::run(l1_sync_service.into()).await; + }); +} + pub async fn run(config: ServerConfig) { let listener = tokio::net::TcpListener::bind(config.socket_addr) .await @@ -48,6 +72,9 @@ pub async fn run(config: ServerConfig) { batch_state_manager: BatchStateManager::new_empty(), server_config: config.clone(), }); + + run_l1_sync(state.clone()).await; + let app = app_router(state); axum::serve(listener, app) diff --git a/kairos-server/tests/transactions.rs b/kairos-server/tests/transactions.rs index 62e5a36d..4fcd0402 100644 --- a/kairos-server/tests/transactions.rs +++ b/kairos-server/tests/transactions.rs @@ -49,6 +49,8 @@ fn new_test_app_with_casper_node(casper_node_url: &Url) -> TestServer { server_config: ServerConfig { socket_addr: "0.0.0.0:0".parse().unwrap(), casper_rpc: casper_node_url.clone(), + casper_contract_hash: + "0000000000000000000000000000000000000000000000000000000000000000".to_string(), }, }); diff --git a/kairos-test-utils/src/kairos.rs b/kairos-test-utils/src/kairos.rs index 331a8638..820367a3 100644 --- a/kairos-test-utils/src/kairos.rs +++ b/kairos-test-utils/src/kairos.rs @@ -22,9 +22,12 @@ impl Kairos { let socket_addr = TcpListener::bind("0.0.0.0:0")?.local_addr()?; let port = socket_addr.port().to_string(); let url = Url::parse(&format!("http://0.0.0.0:{}", port)).unwrap(); + let casper_contract_hash = + String::from("0000000000000000000000000000000000000000000000000000000000000000"); let config = kairos_server::config::ServerConfig { socket_addr, casper_rpc, + casper_contract_hash, }; let process_handle = tokio::spawn(async move { diff --git a/nixos/modules/kairos.nix b/nixos/modules/kairos.nix index fcb8540f..e4efafbd 100644 --- a/nixos/modules/kairos.nix +++ b/nixos/modules/kairos.nix @@ -73,6 +73,7 @@ in RUST_LOG = cfg.logLevel; KAIROS_SERVER_SOCKET_ADDR = "${cfg.bindAddress}:${builtins.toString cfg.port}"; KAIROS_SERVER_CASPER_RPC = "${cfg.casperRpcUrl}"; + KAIROS_SERVER_CASPER_CONTRACT_HASH = "0000000000000000000000000000000000000000000000000000000000000000"; }; serviceConfig = mkMerge [ {