Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic L1 synchronization #96

Merged
merged 41 commits into from
Jun 17, 2024
Merged

Periodic L1 synchronization #96

merged 41 commits into from
Jun 17, 2024

Conversation

koxu1996
Copy link
Contributor

@koxu1996 koxu1996 commented May 9, 2024

This PR adds periodic L1 synchronization 🔃, based on casper-event-toolkit.

Details

The core component is EventManager, which allows tracking events from a specified smart contract. The process begins with the setup of CES-related data - schemas, toolkit fetcher, and the number of events - using initialize(). Subsequently, process_new_events() checks for new events based on its internal counter, next_event_id.

L1SyncService operates the event manager as a Tokio task and provides a safe API for shared access within an Axum server.

Future work

  • Once the deposit contract is merged into main, we will be able to parse deposit data from events - currently, this data is mocked - and push it to the batch manager (trie).
  • Right now synchronization is called periodically - it will be replaced with SSE trigger in the future.

Demo 🎥

l1-sync-demo5-cropped.mp4

If you want to reproduce it locally, please see the next section.

Test environment

I used dockerized NCTL with bunch of shell scripts to run this demo - all available in this gist.

1. Cleanup old resources and compile example smart contract

$ ./cleanup.sh
$ ./prepare.sh

2. Run NCTL and deploy WASM

$ ./setup.sh
> Running NCTL container.
9e4fd0c153e63765ea13b565c70e2d2986ad929a2e74e74cb5e37938e3fe1edf

> Copying user keys.
Successfully copied 37.4kB to /tmp/casper-users

> Deploying session code.
- hash: f9531e06d8f533aa033106b40030ff6d4a9f6b63dc542ff3177d6d29412313ee

> Waiting for execution.
- attempt: 1/10
- attempt: 2/10
> Getting state root hash.
 - d196121dc9613867e228fb807c798fee4e9d1d7d84a2ba6e1a9067494a63c231

> Getting account hash.
 - c6ec39ce20e3bda02cd061e5c91698b0be8a5d5f9abf2b95b8a2a639ae88d060

> Getting contract hash.
 - 9397c7dbfe9ffd60fd03774d8b570bcb3a3a482c5eb3202c97779cb5c0dac008

At this point you have contract hash that is ready to be used in the next steps.

3. Initialize Casper Event Standard

$ ./call-init-entrypoint.sh 9397c7dbfe9ffd60fd03774d8b570bcb3a3a482c5eb3202c97779cb5c0dac008
> Calling 'init' entrypoint.
 - 24c05e37de1c453dd85ef9d93384583c3bbc7106dc916b219e02335cfd3a78ff

> Waiting for execution.
- attempt: 1/20
- attempt: 2/20
- attempt: 3/20
- attempt: 4/20

*-------*
| DONE! |
*-------*

4. Call deposit/batch entrypoint to emit test events

./call-batch-entrypoint.sh 9397c7dbfe9ffd60fd03774d8b570bcb3a3a482c5eb3202c97779cb5c0dac008
> Calling 'batch' entrypoint.
 - 63342404ed21eab1d3a29703a76f81834a0e8bb3bdf3ad1caa53357bf2bd4c38

> Waiting for execution.
- attempt: 1/20
- attempt: 2/20
- attempt: 3/20
- attempt: 4/20
- attempt: 5/20

*-------*
| DONE! |
*-------*

Additional changes for demo

In order to record nice and quick demo, I updated subscribed to include relevant logs:

diff --git a/kairos-server/src/main.rs b/kairos-server/src/main.rs
index 107e421..7b8f1ce 100644
--- a/kairos-server/src/main.rs
+++ b/kairos-server/src/main.rs
@@ -5,6 +5,11 @@ use kairos_server::config::ServerConfig;
 async fn main() {
     let subscriber = tracing_subscriber::fmt()
         .with_max_level(tracing::Level::INFO)
+        .with_env_filter(
+            "kairos_server::l1_sync=debug,kairos_server=warn"
+                .parse::<tracing_subscriber::EnvFilter>()
+                .unwrap(),
+        )
         .finish();

     tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");

I also reduced synchronization interval from 30 to 10 seconds:

diff --git a/kairos-server/src/l1_sync/interval_trigger.rs b/kairos-server/src/l1_sync/interval_trigger.rs
index 419f13c..08acc65 100644
--- a/kairos-server/src/l1_sync/interval_trigger.rs
+++ b/kairos-server/src/l1_sync/interval_trigger.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
 use super::service::L1SyncService;

 pub async fn run(sync_service: Arc<L1SyncService>) {
-    let mut interval = time::interval(Duration::from_secs(30));
+    let mut interval = time::interval(Duration::from_secs(10));

     loop {
         interval.tick().await;

@koxu1996 koxu1996 self-assigned this May 9, 2024
Copy link

github-actions bot commented May 9, 2024

File Coverage
All files 51%
kairos-crypto/src/implementations/casper.rs 6%
kairos-server/src/config.rs 0%
kairos-server/src/errors.rs 12%
kairos-server/src/lib.rs 87%
kairos-server/src/utils.rs 22%
kairos-server/src/routes/deposit.rs 88%
kairos-server/src/routes/transfer.rs 90%
kairos-server/src/state/transactions.rs 57%
kairos-server/src/state/trie.rs 35%
kairos-server/src/l1_sync/event_manager.rs 12%
kairos-server/src/l1_sync/interval_trigger.rs 0%
kairos-server/src/l1_sync/service.rs 56%
kairos-test-utils/src/cctl.rs 87%
kairos-server/src/state/transactions/batch_state.rs 42%
kairos-test-utils/src/cctl/parsers.rs 66%
kairos-tx/src/asn.rs 48%
kairos-tx/src/error.rs 0%
kairos-server/tests/transactions.rs 85%

Minimum allowed coverage is 60%

Generated by 🐒 cobertura-action against 70c7057

@koxu1996 koxu1996 marked this pull request as ready for review May 14, 2024 10:39
@koxu1996 koxu1996 added the demo Required for our first demo label May 14, 2024
Copy link
Contributor

@Rom3dius Rom3dius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solid work

kairos-server/src/l1_sync/event_manager.rs Outdated Show resolved Hide resolved
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash)
.await
{
panic!("Event manager failed to initialize: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's propagate this error up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no "up" this is basically before we start the server

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also simply do a map_err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_err introduced in 326b147.

}
SyncCommand::TriggerSync(completion_ack) => {
em.process_new_events().await?;
let _ = completion_ack.send(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens on failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f437b2e.

kairos-server/src/lib.rs Outdated Show resolved Hide resolved
.initialize(config.casper_rpc.to_string(), config.casper_contract_hash)
.await
{
panic!("Event manager failed to initialize: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no "up" this is basically before we start the server

.initialize(config.casper_rpc.to_string(), config.casper_contract_hash)
.await
{
panic!("Event manager failed to initialize: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also simply do a map_err

{
panic!("Event manager failed to initialize: {}", e);
}
l1_sync::interval_trigger::run(l1_sync_service).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there 3 steps to run this service? Just make run take the server config and do all of it in run or introduce three parameters i.e. batch_state, contract_hash and the casper_rpc url.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Service must be created and initialized, then periodic trigger spawned - all that happens in run_l1_sync().

Btw. initialization is done directly now - 1b6cae2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialization steps removed in 50a3654 and 0d4fb13.

Comment on lines 76 to 81
match handle_command(command, event_manager.clone()).await {
Ok(()) => {}
Err(L1SyncError::UnexpectedError(e)) => panic!("Unrecoverable error: {}", e),
Err(e) => tracing::error!("Transient error: {}", e),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be replaced with map_err and then just match on the error type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1fac715.

Comment on lines 14 to 18
let result = sync_service.trigger_sync().await;

if let Err(e) = result {
tracing::error!("Unable to trigger sync: {}", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use map_err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in be86df8.

@koxu1996 koxu1996 force-pushed the feature/l1-synchronization branch from 7a69feb to e5f9f8b Compare June 7, 2024 16:48
Comment on lines 12 to 27
pub struct EventManager {
next_event_id: u32,
fetcher: Option<Fetcher>,
schemas: Option<Schemas>,
server_state: Arc<ServerStateInner>,
}

impl EventManager {
pub fn new(server_state: Arc<ServerStateInner>) -> Self {
EventManager {
next_event_id: 0,
fetcher: None,
schemas: None,
server_state,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where it makes sense to model the fetcher and schemas attributes as optionals and introduce a new type that basically sets up the user for using the api in a wrokng way, and moreover forces a user to have a two-step setup process ending up with these two possible use scenarios:
(Notice that the case fetcher == Some && schema == None and fetcher == None && schema Some is not even possible)

// correct
let event_manager = EventManager::new(); // fetcher == None && schema == None
event_manager.initialize(); // fetcher == Some(..) && schema == Some(..)
event_manager.process_new_events();

// wrong usage, but possible without any compiler warning
let event_manager = EventManager::new();  // fetcher == None && schema == None
event_manager.process_new_events(); // This will fail and I think I should not even be able to call this and a strong indicator is a missing fetcher and schema, which can both be used to prevent an eventuall call

// better: process_new_events is now just a function in the server crate and can belong semantically to the server
// the ServerState contains all the information like the `rpc_url`, `contract_hash`, `server_state`, even the `next_event_id`
pub fn process_new_events(server_state: &ServerState, callback: FnOnce -> Result) -> Result {
// initialize the fethcer. Could fail, hence result return type
// initialize the schema. Could fail, hence result return type
// call callback which injects any behavior a user might want to use which could potentially fail, hence Result return type of the callback
}

The latter being a single function call, which deals with all the details a user does not care about. The user only cares about what should happen when new events are observed, reducing the amount of knowledge to just implement the callback logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed partial construction pattern, so fetcher and schemas are no longer optional: 50a3654.

@koxu1996 koxu1996 mentioned this pull request Jun 10, 2024
5 tasks
@koxu1996 koxu1996 requested a review from marijanp June 11, 2024 09:19
Comment on lines 21 to 22
rpc_url: &str,
contract_hash: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the rpc_url from the server state. i.e. server_state.server_config.casper_rpc same goes for the contract hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 736a540 and 3b52fb1.

kairos-server/src/lib.rs Outdated Show resolved Hide resolved
@@ -38,6 +40,35 @@ pub fn app_router(state: ServerState) -> Router {
.with_state(state)
}

pub async fn run_l1_sync(config: ServerConfig, server_state: Arc<ServerStateInner>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't take ownership here.

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config: ServerConfig removed in 8c5cc97.

Comment on lines 23 to 24
rpc_url: String,
contract_hash: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes here as commented for the EventManager both these are in the server_state.server_config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 736a540 and 3b52fb1.

.enqueue_transaction(txn)
.await
.map_err(|e| L1SyncError::UnexpectedError(format!("unable to batch tx: {}", e)))?;
self.next_event_id = i + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads odd to me, I would move this outside of the for loop, and set it to fetch_events_count. We are not benefiting from updating that value immediately.

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep EventManager state - next_event_id - in sync with enqueued transactions. In other words if some transaction fails to batch, then manager does not consider event as processed.

Update: Counter gets incremented with every successfully processed event.

// TODO: Parse full transaction data from event, then push it to Data Availability layer.

// TODO: Once we have ASN transaction, it should be converted and pushed into batch.
let recipient: Vec<u8> = "cafebabe".into();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we mocking the deposit here?

Copy link
Contributor Author

@koxu1996 koxu1996 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping that #88 will get merged, and then it could be used for parsing. Since kairos-tx in the contract is postponed for later discussion, I will implement basic parsing here. Blocked on #121.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#121 got merged, so I introduced Deposit event parsing in ff7b6c6.

However, recipient field is still mocked. It is not currently possible to obtain depositor public key from event, as it contains account hash (unnecessarily wrapped in Key)... This was already solved in kairos-tx for contract PR - validation if explicitly given public key matches with caller account hash - so maybe we should reconsider merging it before demo?

Copy link
Contributor

@Avi-D-coder Avi-D-coder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix the mocked recipient.

@koxu1996 koxu1996 force-pushed the feature/l1-synchronization branch from a639bde to 8c5cc97 Compare June 12, 2024 10:26
@koxu1996 koxu1996 requested review from Avi-D-coder and marijanp June 13, 2024 08:33
@koxu1996 koxu1996 merged commit 0c244c0 into main Jun 17, 2024
4 checks passed
@koxu1996 koxu1996 deleted the feature/l1-synchronization branch June 17, 2024 15:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
demo Required for our first demo
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants