-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodic L1 synchronization #96
Changes from 28 commits
27f2c82
b9f2713
fbf99e5
03544d2
a60cbb9
6897e36
413c599
8806f1b
a4b0434
88aa500
70c7057
f57b840
308577b
c2ccfdf
c09d3fd
1fac715
be86df8
326b147
15a1197
1f1b1d3
1065394
1b6cae2
f437b2e
7523128
9e1a58a
487d3c6
ca726c4
e5f9f8b
50a3654
dd3bc22
0d4fb13
c89f14d
736a540
3b52fb1
8c5cc97
f26f045
5b7934b
ff7b6c6
7c35fd0
9f90399
59b91af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893" | ||
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc" | ||
KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
use thiserror::Error; | ||
|
||
#[derive(Error, Debug)] | ||
pub enum L1SyncError { | ||
/// Casper Event Toolkit error. | ||
#[error("toolkit error: {error}")] | ||
ToolkitError { | ||
#[from] | ||
error: casper_event_toolkit::error::ToolkitError, | ||
}, | ||
|
||
/// Communication error. | ||
#[error("channel error: {0}")] | ||
BrokenChannel(String), | ||
|
||
/// Initialization error. | ||
#[error("Initialization error: {0}")] | ||
InitializationError(String), | ||
|
||
/// Error that we cannot recover from. | ||
#[error("Unexpected error: {0}")] | ||
UnexpectedError(String), | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
use std::sync::Arc; | ||
|
||
use casper_event_toolkit::fetcher::{Fetcher, Schemas}; | ||
use casper_event_toolkit::metadata::CesMetadataRef; | ||
use casper_event_toolkit::rpc::client::CasperClient; | ||
|
||
use crate::state::ServerStateInner; | ||
use kairos_circuit_logic::transactions::{KairosTransaction, L1Deposit}; | ||
|
||
use super::error::L1SyncError; | ||
|
||
pub struct EventManager { | ||
next_event_id: u32, | ||
fetcher: Option<Fetcher>, | ||
schemas: Option<Schemas>, | ||
server_state: Arc<ServerStateInner>, | ||
} | ||
|
||
impl EventManager { | ||
pub fn new(server_state: Arc<ServerStateInner>) -> Self { | ||
EventManager { | ||
next_event_id: 0, | ||
fetcher: None, | ||
schemas: None, | ||
server_state, | ||
} | ||
} | ||
|
||
/// Initializes state by building CES fetcher and obtaining schemas. | ||
pub async fn initialize( | ||
&mut self, | ||
rpc_url: &str, | ||
contract_hash: &str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can get the rpc_url from the server state. i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
) -> Result<(), L1SyncError> { | ||
tracing::info!("Initializing event manager"); | ||
|
||
let client = CasperClient::new(rpc_url); | ||
let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?; | ||
tracing::debug!("Metadata fetched successfully"); | ||
|
||
let fetcher = Fetcher { | ||
client, | ||
ces_metadata: metadata, | ||
}; | ||
let schemas = fetcher.fetch_schema().await?; | ||
tracing::debug!("Schemas fetched successfully"); | ||
|
||
self.fetcher = Some(fetcher); | ||
self.schemas = Some(schemas); | ||
|
||
Ok(()) | ||
} | ||
Avi-D-coder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// Processes new events starting from the last known event ID. | ||
pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> { | ||
tracing::info!("Looking for new events"); | ||
|
||
// Ensure fetcher and schemas are initialized | ||
let fetcher = self.fetcher.as_ref().ok_or_else(|| { | ||
L1SyncError::InitializationError("Fetcher not initialized".to_string()) | ||
})?; | ||
let schemas = self.schemas.as_ref().ok_or_else(|| { | ||
L1SyncError::InitializationError("Schemas not initialized".to_string()) | ||
})?; | ||
|
||
let num_events = fetcher.fetch_events_count().await?; | ||
for i in self.next_event_id..num_events { | ||
let event = fetcher.fetch_event(i, schemas).await?; | ||
tracing::debug!("Event {} fetched: {:?}.", i, event); | ||
|
||
// TODO: Parse full transaction data from event, then push it to Data Availability layer. | ||
|
||
// TODO: Once we have ASN transaction, it should be converted and pushed into batch. | ||
let recipient: Vec<u8> = "cafebabe".into(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we mocking the deposit here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #121 got merged, so I introduced However, |
||
let txn = KairosTransaction::Deposit(L1Deposit { | ||
amount: 100, | ||
recipient, | ||
}); | ||
self.server_state | ||
.batch_state_manager | ||
.enqueue_transaction(txn) | ||
.await | ||
.map_err(|e| L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)))?; | ||
self.next_event_id = i + 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reads odd to me, I would move this outside of the for loop, and set it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to keep Update: Counter gets incremented with every successfully processed event. |
||
} | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
use tokio::time::{self, Duration}; | ||
|
||
use std::sync::Arc; | ||
|
||
use super::service::L1SyncService; | ||
|
||
pub async fn run(sync_service: Arc<L1SyncService>) { | ||
let mut interval = time::interval(Duration::from_secs(30)); | ||
|
||
loop { | ||
interval.tick().await; | ||
|
||
tracing::debug!("Triggering periodic L1 sync"); | ||
let _ = sync_service.trigger_sync().await.map_err(|e| { | ||
tracing::error!("Unable to trigger sync: {}", e); | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pub mod error; | ||
pub mod event_manager; | ||
pub mod service; | ||
|
||
pub mod interval_trigger; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
use crate::state::ServerStateInner; | ||
|
||
use super::error::L1SyncError; | ||
use super::event_manager::EventManager; | ||
|
||
use tokio::sync::mpsc; | ||
use tokio::sync::oneshot; | ||
|
||
use std::sync::Arc; | ||
|
||
pub enum SyncCommand { | ||
TriggerSync(oneshot::Sender<()>), | ||
// NOTE: More commands can be here. | ||
} | ||
|
||
pub struct L1SyncService { | ||
command_sender: Option<mpsc::Sender<SyncCommand>>, | ||
//event_manager_handle: Option<tokio::task::JoinHandle<()>>, | ||
server_state: Arc<ServerStateInner>, | ||
} | ||
|
||
impl L1SyncService { | ||
pub fn new(server_state: Arc<ServerStateInner>) -> Self { | ||
L1SyncService { | ||
command_sender: None, | ||
//event_manager_handle: None, | ||
server_state, | ||
} | ||
} | ||
|
||
pub async fn initialize( | ||
&mut self, | ||
rpc_url: String, | ||
contract_hash: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same goes here as commented for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
) -> Result<(), L1SyncError> { | ||
let mut event_manager = EventManager::new(self.server_state.clone()); | ||
event_manager.initialize(&rpc_url, &contract_hash).await?; | ||
|
||
let (tx, rx) = mpsc::channel(32); | ||
self.command_sender = Some(tx); | ||
let _handle = tokio::spawn(async move { | ||
run_event_manager(rx, event_manager).await; | ||
}); | ||
//self.event_manager_handle = Some(handle); | ||
|
||
Ok(()) | ||
} | ||
|
||
pub async fn trigger_sync(&self) -> Result<(), L1SyncError> { | ||
let command_sender = self.command_sender.as_ref().ok_or_else(|| { | ||
L1SyncError::InitializationError("Command sender not available".to_string()) | ||
})?; | ||
|
||
let (tx, rx) = oneshot::channel(); | ||
command_sender | ||
.send(SyncCommand::TriggerSync(tx)) | ||
.await | ||
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?; | ||
rx.await.map_err(|e| { | ||
L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e)) | ||
})?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
/// Handles incoming commands and delegates tasks to EventManager. | ||
async fn run_event_manager(mut rx: mpsc::Receiver<SyncCommand>, mut event_manager: EventManager) { | ||
tracing::debug!("Event manager running and waiting for commands"); | ||
while let Some(command) = rx.recv().await { | ||
let _ = handle_command(command, &mut event_manager) | ||
.await | ||
.map_err(|e| match e { | ||
L1SyncError::UnexpectedError(e) => panic!("Unrecoverable error: {}", e), | ||
_ => tracing::error!("Transient error: {}", e), | ||
}); | ||
} | ||
} | ||
|
||
async fn handle_command( | ||
command: SyncCommand, | ||
event_manager: &mut EventManager, | ||
) -> Result<(), L1SyncError> { | ||
match command { | ||
SyncCommand::TriggerSync(completion_ack) => { | ||
event_manager.process_new_events().await?; | ||
completion_ack | ||
.send(()) | ||
.map_err(|_| L1SyncError::BrokenChannel("Sender dropped".to_string()))?; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ pub mod errors; | |
pub mod routes; | ||
pub mod state; | ||
|
||
mod l1_sync; | ||
mod utils; | ||
|
||
use std::sync::Arc; | ||
|
@@ -13,6 +14,7 @@ use axum_extra::routing::RouterExt; | |
pub use errors::AppErr; | ||
|
||
use crate::config::ServerConfig; | ||
use crate::l1_sync::service::L1SyncService; | ||
use crate::state::{BatchStateManager, ServerState, ServerStateInner}; | ||
|
||
/// TODO: support secp256k1 | ||
|
@@ -38,6 +40,33 @@ pub fn app_router(state: ServerState) -> Router { | |
.with_state(state) | ||
} | ||
|
||
pub async fn run_l1_sync(config: ServerConfig, server_state: Arc<ServerStateInner>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't take ownership here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// Make sure real contract hash was provided. | ||
if config.casper_contract_hash | ||
== "0000000000000000000000000000000000000000000000000000000000000000" | ||
{ | ||
tracing::warn!( | ||
"Casper contract hash not configured, L1 synchronization will NOT be enabled." | ||
); | ||
return; | ||
} | ||
marijanp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Initialize L1 synchronizer. | ||
let mut l1_sync_service = L1SyncService::new(server_state); | ||
let _ = l1_sync_service | ||
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash) | ||
.await | ||
.map_err(|e| { | ||
panic!("Event manager failed to initialize: {}", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's propagate this error up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no "up" this is basically before we start the server There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also simply do a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
}); | ||
|
||
// Run periodic synchronization. | ||
// TODO: Add additional SSE trigger. | ||
tokio::spawn(async move { | ||
l1_sync::interval_trigger::run(l1_sync_service.into()).await; | ||
}); | ||
} | ||
|
||
pub async fn run(config: ServerConfig) { | ||
let listener = tokio::net::TcpListener::bind(config.socket_addr) | ||
.await | ||
|
@@ -48,6 +77,9 @@ pub async fn run(config: ServerConfig) { | |
batch_state_manager: BatchStateManager::new_empty(), | ||
server_config: config.clone(), | ||
}); | ||
|
||
run_l1_sync(config.clone(), state.clone()).await; | ||
|
||
let app = app_router(state); | ||
|
||
axum::serve(listener, app) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where it makes sense to model the
fetcher
andschemas
attributes as optionals and introduce a new type that basically sets up the user for using the api in a wrokng way, and moreover forces a user to have a two-step setup process ending up with these two possible use scenarios:(Notice that the case
fetcher == Some && schema == None
andfetcher == None && schema Some
is not even possible)The latter being a single function call, which deals with all the details a user does not care about. The user only cares about what should happen when new events are observed, reducing the amount of knowledge to just implement the callback logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed partial construction pattern, so
fetcher
andschemas
are no longer optional: 50a3654.