Skip to content

Commit

Permalink
Merge pull request #96 from cspr-rad/feature/l1-synchronization
Browse files Browse the repository at this point in the history
Periodic L1 synchronization
  • Loading branch information
koxu1996 authored Jun 17, 2024
2 parents 3c8ab4f + 59b91af commit 0c244c0
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 1 deletion.
50 changes: 50 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"kairos-tx",
"kairos-prover/kairos-circuit-logic",
"demo-contract-tests",
"kairos-contracts/demo-contract/contract-utils"
]

[workspace.package]
Expand Down
4 changes: 3 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
./kairos-test-utils
./kairos-tx
./kairos-prover/kairos-circuit-logic
./kairos-contracts/demo-contract/contract-utils
];
};

Expand Down Expand Up @@ -110,7 +111,8 @@
RUST_SRC_PATH = "${rustToolchain}/lib/rustlib/src/rust/library";
CASPER_CHAIN_NAME = "cspr-dev-cctl";
PATH_TO_WASM_BINARIES = "${self'.packages.kairos-contracts}/bin";
inputsFrom = [ self'.packages.kairos ];
CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_LINKER = "lld";
inputsFrom = [ self'.packages.kairos self'.packages.kairos-contracts ];
};

packages = {
Expand Down
1 change: 1 addition & 0 deletions kairos-server/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893"
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc"
KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000"
3 changes: 3 additions & 0 deletions kairos-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
hex = "0.4"
kairos-tx = { path = "../kairos-tx" }
contract-utils = { path = "../kairos-contracts/demo-contract/contract-utils" }
kairos-circuit-logic = { path = "../kairos-prover/kairos-circuit-logic", features = ["serde", "asn1"] }
kairos-trie = { git = "https://github.com/cspr-rad/kairos-trie" }
sha2 = "0.10"
reqwest = "0.12"
casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.3" }
thiserror = "1.0"

[dev-dependencies]
proptest = "1"
Expand Down
3 changes: 3 additions & 0 deletions kairos-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use std::{fmt, str::FromStr};
pub struct ServerConfig {
pub socket_addr: SocketAddr,
pub casper_rpc: Url,
pub casper_contract_hash: String,
}

impl ServerConfig {
pub fn from_env() -> Result<Self, String> {
let socket_addr = parse_env_as::<SocketAddr>("KAIROS_SERVER_SOCKET_ADDR")?;
let casper_rpc = parse_env_as::<Url>("KAIROS_SERVER_CASPER_RPC")?;
let casper_contract_hash = parse_env_as::<String>("KAIROS_SERVER_CASPER_CONTRACT_HASH")?;
Ok(Self {
socket_addr,
casper_rpc,
casper_contract_hash,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions kairos-server/src/l1_sync/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum L1SyncError {
/// Casper Event Toolkit error.
#[error("toolkit error: {error}")]
ToolkitError {
#[from]
error: casper_event_toolkit::error::ToolkitError,
},

/// Communication error.
#[error("channel error: {0}")]
BrokenChannel(String),

/// Error that we cannot recover from.
#[error("Unexpected error: {0}")]
UnexpectedError(String),
}
89 changes: 89 additions & 0 deletions kairos-server/src/l1_sync/event_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::Arc;

use casper_event_toolkit::casper_types::bytesrepr::FromBytes;
use casper_event_toolkit::fetcher::{Fetcher, Schemas};
use casper_event_toolkit::metadata::CesMetadataRef;
use casper_event_toolkit::rpc::client::CasperClient;
use contract_utils::Deposit;

use crate::state::ServerStateInner;
use kairos_circuit_logic::transactions::{KairosTransaction, L1Deposit};

use super::error::L1SyncError;

pub struct EventManager {
next_event_id: u32,
fetcher: Fetcher,
schemas: Schemas,
server_state: Arc<ServerStateInner>,
}

impl EventManager {
pub async fn new(server_state: Arc<ServerStateInner>) -> Result<Self, L1SyncError> {
tracing::info!("Initializing event manager");

let rpc_url = server_state.server_config.casper_rpc.as_str();
let contract_hash = server_state.server_config.casper_contract_hash.as_str();
let client = CasperClient::new(rpc_url);
let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?;
tracing::debug!("Metadata fetched successfully");

let fetcher = Fetcher {
client,
ces_metadata: metadata,
};
let schemas = fetcher.fetch_schema().await?;
tracing::debug!("Schemas fetched successfully");

Ok(EventManager {
next_event_id: 0,
fetcher,
schemas,
server_state,
})
}

/// Processes new events starting from the last known event ID.
pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> {
tracing::info!("Looking for new events");

let num_events = self.fetcher.fetch_events_count().await?;
for i in self.next_event_id..num_events {
let event = self.fetcher.fetch_event(i, &self.schemas).await?;
tracing::debug!("Event {} fetched: {:?}.", i, event);

let event_bytes = event.to_ces_bytes()?;

// (koxu1996) NOTE: I think we should rather use full transaction data (ASN) for events,
// parse them here with `kairos-tx` and then push to Data Availability layer.

match event.name.as_str() {
"Deposit" => {
// Parse simplified deposit data.
let (deposit, _) = Deposit::from_bytes(&event_bytes)
.expect("Failed to parse deposit event from bytes");

let amount = deposit.amount;
let recipient: Vec<u8> = "cafebabe".into(); // CAUTION: Using mocked recipient, as event does NOT contain depositor's public key.
let txn = KairosTransaction::Deposit(L1Deposit { amount, recipient });

// Push deposit to trie.
self.server_state
.batch_state_manager
.enqueue_transaction(txn)
.await
.map_err(|e| {
L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e))
})?;
}
name => {
tracing::error!("Unrecognized event {}", name);
}
}

self.next_event_id = i + 1;
}

Ok(())
}
}
18 changes: 18 additions & 0 deletions kairos-server/src/l1_sync/interval_trigger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use tokio::time::{self, Duration};

use std::sync::Arc;

use super::service::L1SyncService;

pub async fn run(sync_service: Arc<L1SyncService>) {
let mut interval = time::interval(Duration::from_secs(30));

loop {
interval.tick().await;

tracing::debug!("Triggering periodic L1 sync");
let _ = sync_service.trigger_sync().await.map_err(|e| {
tracing::error!("Unable to trigger sync: {}", e);
});
}
}
5 changes: 5 additions & 0 deletions kairos-server/src/l1_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod error;
pub mod event_manager;
pub mod service;

pub mod interval_trigger;
77 changes: 77 additions & 0 deletions kairos-server/src/l1_sync/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::state::ServerStateInner;

use super::error::L1SyncError;
use super::event_manager::EventManager;

use tokio::sync::mpsc;
use tokio::sync::oneshot;

use std::sync::Arc;

pub enum SyncCommand {
TriggerSync(oneshot::Sender<()>),
// NOTE: More commands can be here.
}

pub struct L1SyncService {
command_sender: mpsc::Sender<SyncCommand>,
//event_manager_handle: tokio::task::JoinHandle<()>,
}

impl L1SyncService {
pub async fn new(server_state: Arc<ServerStateInner>) -> Result<Self, L1SyncError> {
let event_manager = EventManager::new(server_state.clone()).await?;

let (tx, rx) = mpsc::channel(32);
let _handle = tokio::spawn(async move {
run_event_manager(rx, event_manager).await;
});

Ok(L1SyncService {
command_sender: tx,
//event_manager_handle: _handle,
})
}

pub async fn trigger_sync(&self) -> Result<(), L1SyncError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(SyncCommand::TriggerSync(tx))
.await
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?;
rx.await.map_err(|e| {
L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e))
})?;

Ok(())
}
}

/// Handles incoming commands and delegates tasks to EventManager.
async fn run_event_manager(mut rx: mpsc::Receiver<SyncCommand>, mut event_manager: EventManager) {
tracing::debug!("Event manager running and waiting for commands");
while let Some(command) = rx.recv().await {
let _ = handle_command(command, &mut event_manager)
.await
.map_err(|e| match e {
L1SyncError::UnexpectedError(e) => panic!("Unrecoverable error: {}", e),
_ => tracing::error!("Transient error: {}", e),
});
}
}

async fn handle_command(
command: SyncCommand,
event_manager: &mut EventManager,
) -> Result<(), L1SyncError> {
match command {
SyncCommand::TriggerSync(completion_ack) => {
event_manager.process_new_events().await?;
completion_ack
.send(())
.map_err(|_| L1SyncError::BrokenChannel("Sender dropped".to_string()))?;
}
}

Ok(())
}
Loading

0 comments on commit 0c244c0

Please sign in to comment.