Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic L1 synchronization #96

Merged
merged 41 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
27f2c82
Add `casper-event-toolkit` dependency.
koxu1996 May 8, 2024
b9f2713
Add `reqwest` dependency.
koxu1996 May 8, 2024
fbf99e5
Add `thiserror` dependency.
koxu1996 May 8, 2024
03544d2
Prototype of state synchronization.
koxu1996 May 8, 2024
a60cbb9
Fix deadlock.
koxu1996 May 10, 2024
6897e36
Use amount greater than 0 for mocked tx.
koxu1996 May 10, 2024
413c599
Merge remote-tracking branch 'origin/feature/configure-casper-rpc-in-…
koxu1996 May 13, 2024
8806f1b
Move L1 sync into separate function.
koxu1996 May 13, 2024
a4b0434
Skip synchronization if contract hash not configured.
koxu1996 May 13, 2024
88aa500
Use scoped ENV for contract hash.
koxu1996 May 14, 2024
70c7057
Merge branch 'main' into feature/l1-synchronization
koxu1996 May 14, 2024
f57b840
Fix default contract hash value.
koxu1996 May 14, 2024
308577b
Add default contract hash to nix setup.
koxu1996 May 14, 2024
c2ccfdf
Bump `casper-event-toolkit` to `v0.1.3`.
koxu1996 Jun 6, 2024
c09d3fd
Remove `into()` that is no longer required.
koxu1996 Jun 6, 2024
1fac715
Replace unused `Result` match with `map_err`.
koxu1996 Jun 6, 2024
be86df8
Replace another unused `Result` with `map_err`.
koxu1996 Jun 6, 2024
326b147
Replace yet another unused `Result` with `map_err`.
koxu1996 Jun 6, 2024
15a1197
Get rid of `Mutex` for `EventManager`.
koxu1996 Jun 7, 2024
1f1b1d3
Return error if L1 processing started without proper init.
koxu1996 Jun 7, 2024
1065394
Prepare sync service for storing event manager handle.
koxu1996 Jun 7, 2024
1b6cae2
Directly initialize event manager.
koxu1996 Jun 7, 2024
f437b2e
Return broken channel error when completion ACK cannot be sent.
koxu1996 Jun 7, 2024
7523128
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
9e1a58a
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
487d3c6
Post merge fixes.
koxu1996 Jun 7, 2024
ca726c4
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 7, 2024
e5f9f8b
Use empty contract hash in test utils.
koxu1996 Jun 7, 2024
50a3654
Initialize `EventManager` directly in constructor.
koxu1996 Jun 11, 2024
dd3bc22
Remove unusued `InitializationError`.
koxu1996 Jun 11, 2024
0d4fb13
Initialize `L1SyncService` directly in constructor.
koxu1996 Jun 11, 2024
c89f14d
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 11, 2024
736a540
Take RPC url from server config.
koxu1996 Jun 12, 2024
3b52fb1
Take contract hash from server config.
koxu1996 Jun 12, 2024
8c5cc97
Update default contract hash check.
koxu1996 Jun 12, 2024
f26f045
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 13, 2024
5b7934b
Add `contract-utils` to server workspace.
koxu1996 Jun 13, 2024
ff7b6c6
Build deposit transaction from parsed event.
koxu1996 Jun 13, 2024
7c35fd0
Fixes for Nix build.
koxu1996 Jun 13, 2024
9f90399
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 14, 2024
59b91af
Merge branch 'main' into feature/l1-synchronization
koxu1996 Jun 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions kairos-server/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
KAIROS_SERVER_SOCKET_ADDR="127.0.0.1:7893"
KAIROS_SERVER_CASPER_RPC="http://127.0.0.1:11101/rpc"
KAIROS_SERVER_CASPER_CONTRACT_HASH="0000000000000000000000000000000000000000000000000000000000000000"
2 changes: 2 additions & 0 deletions kairos-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ kairos-tx = { path = "../kairos-tx" }
kairos-trie = { git = "https://github.com/cspr-rad/kairos-trie" }
sha2 = "0.10"
reqwest = "0.12"
casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.1" }
thiserror = "1.0"

[dev-dependencies]
proptest = "1"
Expand Down
5 changes: 4 additions & 1 deletion kairos-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ use std::{fmt, str::FromStr};

use reqwest::Url;

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ServerConfig {
pub socket_addr: SocketAddr,
pub casper_rpc: Url,
pub casper_contract_hash: String,
}

impl ServerConfig {
pub fn from_env() -> Result<Self, String> {
let socket_addr = parse_env_as::<SocketAddr>("KAIROS_SERVER_SOCKET_ADDR")?;
let casper_rpc = parse_env_as::<Url>("KAIROS_SERVER_CASPER_RPC")?;
let casper_contract_hash = parse_env_as::<String>("KAIROS_SERVER_CASPER_CONTRACT_HASH")?;
Ok(Self {
socket_addr,
casper_rpc,
casper_contract_hash,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions kairos-server/src/l1_sync/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum L1SyncError {
/// Casper Event Toolkit error.
#[error("toolkit error: {error}")]
ToolkitError {
#[from]
error: casper_event_toolkit::error::ToolkitError,
},

/// Communication error.
#[error("channel error: {0}")]
BrokenChannel(String),

/// Error that we cannot recover from.
#[error("Unexpected error: {0}")]
UnexpectedError(String),
}
85 changes: 85 additions & 0 deletions kairos-server/src/l1_sync/event_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::sync::Arc;

use casper_event_toolkit::fetcher::{Fetcher, Schemas};
use casper_event_toolkit::metadata::CesMetadataRef;
use casper_event_toolkit::rpc::client::CasperClient;

use crate::state::transactions::{Deposit, Signed, Transaction};
use crate::state::BatchStateManager;

use super::error::L1SyncError;

pub struct EventManager {
next_event_id: u32,
fetcher: Option<Fetcher>,
schemas: Option<Schemas>,
batch_service: Arc<BatchStateManager>,
}

impl EventManager {
pub fn new(batch_service: Arc<BatchStateManager>) -> Self {
EventManager {
next_event_id: 0,
fetcher: None,
schemas: None,
batch_service,
}
}

/// Initializes state by building CES fetcher and obtaining schemas.
pub async fn initialize(
&mut self,
rpc_url: &str,
contract_hash: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the rpc_url from the server state. i.e. server_state.server_config.casper_rpc same goes for the contract hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 736a540 and 3b52fb1.

) -> Result<(), L1SyncError> {
tracing::info!("Initializing event manager");

let client = CasperClient::new(rpc_url);
let metadata = CesMetadataRef::fetch_metadata(&client, contract_hash).await?;
tracing::debug!("Metadata fetched successfully");

let fetcher = Fetcher {
client,
ces_metadata: metadata,
};
let schemas = fetcher.fetch_schema().await?;
tracing::debug!("Schemas fetched successfully");

self.fetcher = Some(fetcher);
self.schemas = Some(schemas);

Ok(())
}
Avi-D-coder marked this conversation as resolved.
Show resolved Hide resolved

/// Processes new events starting from the last known event ID.
pub async fn process_new_events(&mut self) -> Result<(), L1SyncError> {
tracing::info!("Looking for new events");

if let Some(fetcher) = &self.fetcher {
let schemas = self.schemas.as_ref().unwrap(); // Assuming schemas are already loaded
let num_events = fetcher.fetch_events_count().await?;
for i in self.next_event_id..num_events {
let event = fetcher.fetch_event(i.into(), schemas).await?;
tracing::debug!("Event {} fetched: {:?}.", i, event);

// TODO: Parse full transaction data from event, then push it to Data Availability layer.

// TODO: Once we have ASN transaction, it should be converted and pushed into batch.
let txn = Signed {
public_key: "cafebabe".into(),
nonce: 0,
transaction: Transaction::Deposit(Deposit { amount: 100 }),
};
self.batch_service
.enqueue_transaction(txn)
.await
.map_err(|e| {
L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e))
})?;
self.next_event_id = i + 1;
}
}

Ok(())
}
}
20 changes: 20 additions & 0 deletions kairos-server/src/l1_sync/interval_trigger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use tokio::time::{self, Duration};

use std::sync::Arc;

use super::service::L1SyncService;

pub async fn run(sync_service: Arc<L1SyncService>) {
let mut interval = time::interval(Duration::from_secs(30));

loop {
interval.tick().await;

tracing::debug!("Triggering periodic L1 sync");
let result = sync_service.trigger_sync().await;

if let Err(e) = result {
tracing::error!("Unable to trigger sync: {}", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use map_err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in be86df8.

}
}
5 changes: 5 additions & 0 deletions kairos-server/src/l1_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod error;
pub mod event_manager;
pub mod service;

pub mod interval_trigger;
102 changes: 102 additions & 0 deletions kairos-server/src/l1_sync/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::state::BatchStateManager;

use super::error::L1SyncError;
use super::event_manager::EventManager;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;

use std::sync::Arc;

pub enum SyncCommand {
Initialize(String, String, oneshot::Sender<()>),
TriggerSync(oneshot::Sender<()>),
}

pub struct L1SyncService {
command_sender: mpsc::Sender<SyncCommand>,
//event_manager: Arc<Mutex<EventManager>>, // NOTE: It could be stored for shared access.
}

impl L1SyncService {
pub async fn new(batch_service: Arc<BatchStateManager>) -> Self {
let (tx, rx) = mpsc::channel(32);
let event_manager = Arc::new(Mutex::new(EventManager::new(batch_service.clone())));
let event_manager_clone = event_manager.clone();

tokio::spawn(async move {
run_event_manager(rx, event_manager_clone).await;
});

L1SyncService {
command_sender: tx,
//event_manager,
}
}

pub async fn initialize(
&self,
rpc_url: String,
contract_hash: String,
) -> Result<(), L1SyncError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(SyncCommand::Initialize(rpc_url, contract_hash, tx))
.await
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send initialize: {}", e)))?;
rx.await.map_err(|e| {
L1SyncError::BrokenChannel(format!("Unable to receive initialize ack: {}", e))
})?;

Ok(())
}

pub async fn trigger_sync(&self) -> Result<(), L1SyncError> {
let (tx, rx) = oneshot::channel();
self.command_sender
.send(SyncCommand::TriggerSync(tx))
.await
.map_err(|e| L1SyncError::BrokenChannel(format!("Unable to send trigger: {}", e)))?;
rx.await.map_err(|e| {
L1SyncError::BrokenChannel(format!("Unable to receive trigger ack: {}", e))
})?;

Ok(())
}
}

/// Handles incoming commands and delegates tasks to EventManager.
async fn run_event_manager(
mut rx: mpsc::Receiver<SyncCommand>,
event_manager: Arc<Mutex<EventManager>>,
) {
tracing::debug!("Event manager running and waiting for commands");
while let Some(command) = rx.recv().await {
match handle_command(command, event_manager.clone()).await {
Ok(()) => {}
Err(L1SyncError::UnexpectedError(e)) => panic!("Unrecoverable error: {}", e),
Err(e) => tracing::error!("Transient error: {}", e),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be replaced with map_err and then just match on the error type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1fac715.

}

async fn handle_command(
command: SyncCommand,
event_manager: Arc<Mutex<EventManager>>,
) -> Result<(), L1SyncError> {
let mut em = event_manager.lock().await;

match command {
SyncCommand::Initialize(rpc_url, contract_hash, completion_ack) => {
em.initialize(&rpc_url, &contract_hash).await?;
let _ = completion_ack.send(());
}
SyncCommand::TriggerSync(completion_ack) => {
em.process_new_events().await?;
let _ = completion_ack.send(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens on failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f437b2e.

}
}

Ok(())
}
Loading
Loading