-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodic L1 synchronization #96
Changes from 13 commits
27f2c82
b9f2713
fbf99e5
03544d2
a60cbb9
6897e36
413c599
8806f1b
a4b0434
88aa500
70c7057
f57b840
308577b
c2ccfdf
c09d3fd
1fac715
be86df8
326b147
15a1197
1f1b1d3
1065394
1b6cae2
f437b2e
7523128
9e1a58a
487d3c6
ca726c4
e5f9f8b
50a3654
dd3bc22
0d4fb13
c89f14d
736a540
3b52fb1
8c5cc97
f26f045
5b7934b
ff7b6c6
7c35fd0
9f90399
59b91af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893" | ||
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc" | ||
KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
use thiserror::Error; | ||
|
||
#[derive(Error, Debug)] | ||
pub enum L1SyncError { | ||
/// Casper Event Toolkit error. | ||
#[error("toolkit error: {error}")] | ||
ToolkitError { | ||
#[from] | ||
error: casper_event_toolkit::error::ToolkitError, | ||
}, | ||
|
||
/// Communication error. | ||
#[error("channel error: {0}")] | ||
BrokenChannel(String), | ||
|
||
/// Error that we cannot recover from. | ||
#[error("Unexpected error: {0}")] | ||
UnexpectedError(String), | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
use std::sync::Arc; | ||
|
||
use casper_event_toolkit::fetcher::{Fetcher, Schemas}; | ||
use casper_event_toolkit::metadata::CesMetadataRef; | ||
use casper_event_toolkit::rpc::client::CasperClient; | ||
|
||
use crate::state::transactions::{Deposit, Signed, Transaction}; | ||
use crate::state::BatchStateManager; | ||
|
||
use super::error::L1SyncError; | ||
|
||
pub struct EventManager { | ||
next_event_id: u32, | ||
fetcher: Option<Fetcher>, | ||
schemas: Option<Schemas>, | ||
batch_service: Arc<BatchStateManager>, | ||
} | ||
|
||
impl EventManager { | ||
pub fn new(batch_service: Arc<BatchStateManager>) -> Self { | ||
EventManager { | ||
next_event_id: 0, | ||
fetcher: None, | ||
schemas: None, | ||
batch_service, | ||
} | ||
} | ||
|
||
/// Initializes state by building CES fetcher and obtaining schemas. | ||
pub async fn initialize( | ||
&mut self, | ||
rpc_url: &str, | ||
contract_hash: &str, | ||
) -> Result<(), L1SyncError> { | ||
tracing::info!("Initializing event manager"); | ||
|
||
let client = CasperClient::new(rpc_url); | ||
let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?; | ||
tracing::debug!("Metadata fetched successfully"); | ||
|
||
let fetcher = Fetcher { | ||
client, | ||
ces_metadata: metadata, | ||
}; | ||
let schemas = fetcher.fetch_schema().await?; | ||
tracing::debug!("Schemas fetched successfully"); | ||
|
||
self.fetcher = Some(fetcher); | ||
self.schemas = Some(schemas); | ||
|
||
Ok(()) | ||
} | ||
Avi-D-coder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// Processes new events starting from the last known event ID. | ||
pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> { | ||
tracing::info!("Looking for new events"); | ||
|
||
if let Some(fetcher) = &self.fetcher { | ||
let schemas = self.schemas.as_ref().unwrap(); // Assuming schemas are already loaded | ||
let num_events = fetcher.fetch_events_count().await?; | ||
for i in self.next_event_id..num_events { | ||
let event = fetcher.fetch_event(i.into(), schemas).await?; | ||
tracing::debug!("Event {} fetched: {:?}.", i, event); | ||
|
||
// TODO: Parse full transaction data from event, then push it to Data Availability layer. | ||
|
||
// TODO: Once we have ASN transaction, it should be converted and pushed into batch. | ||
let txn = Signed { | ||
public_key: "cafebabe".into(), | ||
nonce: 0, | ||
transaction: Transaction::Deposit(Deposit { amount: 100 }), | ||
}; | ||
self.batch_service | ||
.enqueue_transaction(txn) | ||
.await | ||
.map_err(|e| { | ||
L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)) | ||
})?; | ||
self.next_event_id = i + 1; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
use tokio::time::{self, Duration}; | ||
|
||
use std::sync::Arc; | ||
|
||
use super::service::L1SyncService; | ||
|
||
pub async fn run(sync_service: Arc<L1SyncService>) { | ||
let mut interval = time::interval(Duration::from_secs(30)); | ||
|
||
loop { | ||
interval.tick().await; | ||
|
||
tracing::debug!("Triggering periodic L1 sync"); | ||
let result = sync_service.trigger_sync().await; | ||
|
||
if let Err(e) = result { | ||
tracing::error!("Unable to trigger sync: {}", e); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in be86df8. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pub mod error; | ||
pub mod event_manager; | ||
pub mod service; | ||
|
||
pub mod interval_trigger; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
use crate::state::BatchStateManager; | ||
|
||
use super::error::L1SyncError; | ||
use super::event_manager::EventManager; | ||
|
||
use tokio::sync::mpsc; | ||
use tokio::sync::oneshot; | ||
use tokio::sync::Mutex; | ||
|
||
use std::sync::Arc; | ||
|
||
pub enum SyncCommand { | ||
Initialize(String, String, oneshot::Sender<()>), | ||
TriggerSync(oneshot::Sender<()>), | ||
} | ||
|
||
pub struct L1SyncService { | ||
command_sender: mpsc::Sender<SyncCommand>, | ||
//event_manager: Arc<Mutex<EventManager>>, // NOTE: It could be stored for shared access. | ||
} | ||
|
||
impl L1SyncService { | ||
pub async fn new(batch_service: Arc<BatchStateManager>) -> Self { | ||
let (tx, rx) = mpsc::channel(32); | ||
let event_manager = Arc::new(Mutex::new(EventManager::new(batch_service.clone()))); | ||
let event_manager_clone = event_manager.clone(); | ||
|
||
tokio::spawn(async move { | ||
run_event_manager(rx, event_manager_clone).await; | ||
}); | ||
|
||
L1SyncService { | ||
command_sender: tx, | ||
//event_manager, | ||
} | ||
} | ||
|
||
pub async fn initialize( | ||
&self, | ||
rpc_url: String, | ||
contract_hash: String, | ||
) -> Result<(), L1SyncError> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.command_sender | ||
.send(SyncCommand::Initialize(rpc_url, contract_hash, tx)) | ||
.await | ||
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send initialize: {}", e)))?; | ||
rx.await.map_err(|e| { | ||
L1SyncError::BrokenChannel(format!("Unable to receive initialize ack: {}", e)) | ||
})?; | ||
|
||
Ok(()) | ||
} | ||
|
||
pub async fn trigger_sync(&self) -> Result<(), L1SyncError> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.command_sender | ||
.send(SyncCommand::TriggerSync(tx)) | ||
.await | ||
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?; | ||
rx.await.map_err(|e| { | ||
L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e)) | ||
})?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
/// Handles incoming commands and delegates tasks to EventManager. | ||
async fn run_event_manager( | ||
mut rx: mpsc::Receiver<SyncCommand>, | ||
event_manager: Arc<Mutex<EventManager>>, | ||
) { | ||
tracing::debug!("Event manager running and waiting for commands"); | ||
while let Some(command) = rx.recv().await { | ||
match handle_command(command, event_manager.clone()).await { | ||
Ok(()) => {} | ||
Err(L1SyncError::UnexpectedError(e)) => panic!("Unrecoverable error: {}", e), | ||
Err(e) => tracing::error!("Transient error: {}", e), | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be replaced with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 1fac715. |
||
} | ||
|
||
async fn handle_command( | ||
command: SyncCommand, | ||
event_manager: Arc<Mutex<EventManager>>, | ||
) -> Result<(), L1SyncError> { | ||
let mut em = event_manager.lock().await; | ||
|
||
match command { | ||
SyncCommand::Initialize(rpc_url, contract_hash, completion_ack) => { | ||
em.initialize(&rpc_url, &contract_hash).await?; | ||
let _ = completion_ack.send(()); | ||
} | ||
SyncCommand::TriggerSync(completion_ack) => { | ||
em.process_new_events().await?; | ||
let _ = completion_ack.send(()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens on failure? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in f437b2e. |
||
} | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the rpc_url from the server state. i.e.
server_state.server_config.casper_rpc
same goes for the contract hash.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 736a540 and 3b52fb1.