From d521e68f9b5fe9dc042b0aceabcab7d393f866d9 Mon Sep 17 00:00:00 2001 From: bexan Date: Thu, 25 Jul 2024 21:07:05 +0200 Subject: [PATCH 1/4] fix(solis): auth solis --- Cargo.lock | 7 +++++++ bin/solis/Cargo.toml | 1 + bin/solis/src/args.rs | 2 +- bin/solis/src/main.rs | 2 ++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 9f0bcd44a0..ace78f9245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3863,6 +3863,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.7" @@ -11403,6 +11409,7 @@ dependencies = [ "common", "console", "dojo-metrics", + "dotenv", "katana-core", "katana-executor", "katana-primitives", diff --git a/bin/solis/Cargo.toml b/bin/solis/Cargo.toml index 5b8debb1c2..0a0c5a07d9 100644 --- a/bin/solis/Cargo.toml +++ b/bin/solis/Cargo.toml @@ -19,6 +19,7 @@ clap_complete.workspace = true common.workspace = true console.workspace = true dojo-metrics.workspace = true +dotenv = "0.15.0" katana-core.workspace = true katana-executor.workspace = true katana-primitives.workspace = true diff --git a/bin/solis/src/args.rs b/bin/solis/src/args.rs index f2fb8b6b01..9fe912a31d 100644 --- a/bin/solis/src/args.rs +++ b/bin/solis/src/args.rs @@ -9,7 +9,7 @@ //! and leak detection functionality. See [jemalloc's opt.prof](https://jemalloc.net/jemalloc.3.html#opt.prof) //! documentation for usage details. This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc) //! for more info. - +use std::env; use std::net::SocketAddr; use std::path::PathBuf; diff --git a/bin/solis/src/main.rs b/bin/solis/src/main.rs index ce806b1e34..14fca64404 100644 --- a/bin/solis/src/main.rs +++ b/bin/solis/src/main.rs @@ -43,6 +43,8 @@ pub(crate) const LOG_TARGET: &str = "katana::cli"; #[tokio::main] async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + let args = KatanaArgs::parse(); args.init_logging()?; From 3e91de32035b69e84836361100265e66f8f75f18 Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 6 Aug 2024 15:10:46 +0200 Subject: [PATCH 2/4] update send_from_block --- crates/katana/core/src/service/messaging/service.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 3bed0e18ea..375a4fb465 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -67,7 +67,7 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 0, + send_from_block: 60, msg_gather_fut: None, msg_send_fut: None, }) @@ -196,6 +196,9 @@ impl Stream for MessagingService { let pin = self.get_mut(); if pin.interval.poll_tick(cx).is_ready() { + println!("MessagingService::poll_next"); + println!("pin.msg_gather_fut.is_none() = {:?}", pin.msg_gather_fut.is_none()); + println!("pin.msg_send_fut.gather_from_block() = {:?}", pin.gather_from_block); if pin.msg_gather_fut.is_none() { pin.msg_gather_fut = Some(Box::pin(Self::gather_messages( pin.messenger.clone(), @@ -222,7 +225,7 @@ impl Stream for MessagingService { if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { - info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); + info!(target: LOG_TARGET, "Gathered {} transactions up to block yb {}", msg_count, last_block); pin.gather_from_block = last_block + 1; return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, From c3c91ca0ed36186bc64a5153517ee84919115c20 Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 6 Aug 2024 17:11:04 +0200 Subject: [PATCH 3/4] feat(katana): prevent sequencer from sending all block multiple times --- .../katana/core/src/service/messaging/mod.rs | 31 ++++++++++++++++--- .../core/src/service/messaging/service.rs | 21 ++++++++----- .../core/src/service/messaging/starknet.rs | 4 +-- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index 259cb483f1..b7b7b8a193 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -41,6 +41,8 @@ mod service; mod starknet; use std::path::Path; +use std::fs::File; +use std::io::Write; use ::starknet::providers::ProviderError as StarknetProviderError; use alloy_transport::TransportError; @@ -49,7 +51,7 @@ use async_trait::async_trait; use ethereum::EthereumMessaging; use katana_primitives::chain::ChainId; use katana_primitives::receipt::MessageToL1; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::{error, info}; pub use self::service::{MessagingOutcome, MessagingService}; @@ -90,7 +92,7 @@ impl From for Error { } /// The config used to initialize the messaging service. -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Deserialize, Clone, Serialize)] pub struct MessagingConfig { /// The settlement chain. pub chain: String, @@ -107,7 +109,11 @@ pub struct MessagingConfig { /// from/to the settlement chain. pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. - pub from_block: u64, + pub gather_from_block: u64, + /// The block from where sequencer wil start sending messages. + pub send_from_block: u64, + /// Path to the config file. + pub config_file: String, } impl MessagingConfig { @@ -119,7 +125,24 @@ impl MessagingConfig { /// This is used as the clap `value_parser` implementation pub fn parse(path: &str) -> Result { - Self::load(path).map_err(|e| e.to_string()) + let mut config = Self::load(path).map_err(|e| e.to_string())?; + config.config_file = path.to_string(); + config.save().map_err(|e| e.to_string())?; + Ok(config) + } + + /// Save the config to a JSON file. + pub fn save(&self) -> Result<(), std::io::Error> { + if self.config_file.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Config file path is empty", + )); + } + let json = serde_json::to_string_pretty(self)?; + let mut file = File::create(&self.config_file)?; + file.write_all(json.as_bytes())?; + Ok(()) } } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 375a4fb465..a524e41f32 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; use katana_provider::traits::transaction::ReceiptProvider; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use std::time::Duration; use tokio::time::{interval_at, Instant, Interval}; @@ -38,6 +38,8 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The messaging configuration. + messaging_config: Arc>, } impl MessagingService { @@ -49,9 +51,10 @@ impl MessagingService { backend: Arc>, hooker: Arc + Send + Sync>>, ) -> anyhow::Result { - let gather_from_block = config.from_block; + let gather_from_block = config.gather_from_block; + let send_from_block = config.send_from_block; let interval = interval_from_seconds(config.interval); - let messenger = match MessengerMode::from_config(config, hooker).await { + let messenger = match MessengerMode::from_config(config.clone(), hooker).await { Ok(m) => Arc::new(m), Err(_) => { panic!( @@ -67,9 +70,10 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 60, + send_from_block, msg_gather_fut: None, msg_send_fut: None, + messaging_config: Arc::new(RwLock::new(config)), }) } @@ -196,9 +200,6 @@ impl Stream for MessagingService { let pin = self.get_mut(); if pin.interval.poll_tick(cx).is_ready() { - println!("MessagingService::poll_next"); - println!("pin.msg_gather_fut.is_none() = {:?}", pin.msg_gather_fut.is_none()); - println!("pin.msg_send_fut.gather_from_block() = {:?}", pin.gather_from_block); if pin.msg_gather_fut.is_none() { pin.msg_gather_fut = Some(Box::pin(Self::gather_messages( pin.messenger.clone(), @@ -225,7 +226,7 @@ impl Stream for MessagingService { if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { - info!(target: LOG_TARGET, "Gathered {} transactions up to block yb {}", msg_count, last_block); + info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); pin.gather_from_block = last_block + 1; return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, @@ -246,6 +247,10 @@ impl Stream for MessagingService { Poll::Ready(Ok(Some((block_num, msg_count)))) => { info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num); pin.send_from_block = block_num + 1; + // update the config with the latest block number sent. + let mut messaging_config = pin.messaging_config.write().unwrap(); + messaging_config.send_from_block = pin.send_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 6375f78227..5473f3bf47 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -225,8 +225,8 @@ impl Messenger for StarknetM async fn gather_messages( &self, - from_block: u64, - max_blocks: u64, + _from_block: u64, + _max_blocks: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { debug!(target: LOG_TARGET, "Gathering messages"); From 621ae0bca3c51dc7044c2bf8da23e730115b2d3f Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 20 Aug 2024 07:18:44 +0200 Subject: [PATCH 4/4] starknet messenging: gather message & update gather from in config file --- .../core/src/service/messaging/service.rs | 4 +- .../core/src/service/messaging/starknet.rs | 94 ++++++++++++++++++- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index a524e41f32..86333cac11 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -222,12 +222,15 @@ impl Stream for MessagingService { } } + let mut messaging_config = pin.messaging_config.write().unwrap(); // Poll the gathering future if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); pin.gather_from_block = last_block + 1; + messaging_config.gather_from_block = pin.gather_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -248,7 +251,6 @@ impl Stream for MessagingService { info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num); pin.send_from_block = block_num + 1; // update the config with the latest block number sent. - let mut messaging_config = pin.messaging_config.write().unwrap(); messaging_config.send_from_block = pin.send_from_block; let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 5473f3bf47..4854722728 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use crate::hooker::KatanaHooker; use anyhow::Result; use async_trait::async_trait; @@ -69,6 +70,52 @@ impl StarknetMessaging { }) } + /// Fetches events for the given blocks range. + pub async fn fetch_events( + &self, + from_block: BlockId, + to_block: BlockId, + ) -> Result>> { + trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); + + let mut block_to_events: HashMap> = HashMap::new(); + + let filter = EventFilter { + from_block: Some(from_block), + to_block: Some(to_block), + address: Some(self.messaging_contract_address), + // TODO: this might come from the configuration actually. + keys: None, + }; + + // TODO: this chunk_size may also come from configuration? + let chunk_size = 200; + let mut continuation_token: Option = None; + + loop { + let event_page = + self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?; + + event_page.events.into_iter().for_each(|event| { + // We ignore events without the block number + if let Some(block_number) = event.block_number { + block_to_events + .entry(block_number) + .and_modify(|v| v.push(event.clone())) + .or_insert(vec![event]); + } + }); + + continuation_token = event_page.continuation_token; + + if continuation_token.is_none() { + break; + } + } + + Ok(block_to_events) + } + async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult> { let mut l1_handler_txs: Vec = vec![]; let mut continuation_token: Option = None; @@ -225,8 +272,8 @@ impl Messenger for StarknetM async fn gather_messages( &self, - _from_block: u64, - _max_blocks: u64, + from_block: u64, + max_blocks: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { debug!(target: LOG_TARGET, "Gathering messages"); @@ -245,6 +292,41 @@ impl Messenger for StarknetM } }; + if from_block > chain_latest_block { + // Nothing to fetch, we can skip waiting the next tick. + return Ok((chain_latest_block, vec![])); + } + + // +1 as the from_block counts as 1 block fetched. + let to_block = if from_block + max_blocks + 1 < chain_latest_block { + from_block + max_blocks + } else { + chain_latest_block + }; + + let mut l1_handler_txs: Vec = vec![]; + + // fetch events for the given range before fetching pending events + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + .await + .map_err(|_| Error::SendError) + .unwrap() + .iter() + .for_each(|(block_number, block_events)| { + debug!( + target: LOG_TARGET, + block_number = %block_number, + events_count = %block_events.len(), + "Converting events of block into L1HandlerTx." + ); + + block_events.iter().for_each(|e| { + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + l1_handler_txs.push(tx) + } + }) + }); + // Check if the block number has changed let previous_block = self.latest_block.load(Ordering::Relaxed); if previous_block != chain_latest_block { @@ -252,11 +334,13 @@ impl Messenger for StarknetM self.event_cache.write().await.clear(); self.latest_block.store(chain_latest_block, Ordering::Relaxed); } - + // Fetch pending events let pending_txs = self.fetch_pending_events(chain_id).await?; - debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len()); + // Add pending events to the list + l1_handler_txs.extend(pending_txs); - Ok((chain_latest_block, pending_txs)) + debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len()); + Ok((chain_latest_block, l1_handler_txs)) } async fn send_messages(