diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index 259cb483f1..b7b7b8a193 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -41,6 +41,8 @@ mod service; mod starknet; use std::path::Path; +use std::fs::File; +use std::io::Write; use ::starknet::providers::ProviderError as StarknetProviderError; use alloy_transport::TransportError; @@ -49,7 +51,7 @@ use async_trait::async_trait; use ethereum::EthereumMessaging; use katana_primitives::chain::ChainId; use katana_primitives::receipt::MessageToL1; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::{error, info}; pub use self::service::{MessagingOutcome, MessagingService}; @@ -90,7 +92,7 @@ impl From for Error { } /// The config used to initialize the messaging service. -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Deserialize, Clone, Serialize)] pub struct MessagingConfig { /// The settlement chain. pub chain: String, @@ -107,7 +109,11 @@ pub struct MessagingConfig { /// from/to the settlement chain. pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. - pub from_block: u64, + pub gather_from_block: u64, + /// The block from where sequencer wil start sending messages. + pub send_from_block: u64, + /// Path to the config file. + pub config_file: String, } impl MessagingConfig { @@ -119,7 +125,24 @@ impl MessagingConfig { /// This is used as the clap `value_parser` implementation pub fn parse(path: &str) -> Result { - Self::load(path).map_err(|e| e.to_string()) + let mut config = Self::load(path).map_err(|e| e.to_string())?; + config.config_file = path.to_string(); + config.save().map_err(|e| e.to_string())?; + Ok(config) + } + + /// Save the config to a JSON file. + pub fn save(&self) -> Result<(), std::io::Error> { + if self.config_file.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Config file path is empty", + )); + } + let json = serde_json::to_string_pretty(self)?; + let mut file = File::create(&self.config_file)?; + file.write_all(json.as_bytes())?; + Ok(()) } } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 3bed0e18ea..86333cac11 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; use katana_provider::traits::transaction::ReceiptProvider; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use std::time::Duration; use tokio::time::{interval_at, Instant, Interval}; @@ -38,6 +38,8 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The messaging configuration. + messaging_config: Arc>, } impl MessagingService { @@ -49,9 +51,10 @@ impl MessagingService { backend: Arc>, hooker: Arc + Send + Sync>>, ) -> anyhow::Result { - let gather_from_block = config.from_block; + let gather_from_block = config.gather_from_block; + let send_from_block = config.send_from_block; let interval = interval_from_seconds(config.interval); - let messenger = match MessengerMode::from_config(config, hooker).await { + let messenger = match MessengerMode::from_config(config.clone(), hooker).await { Ok(m) => Arc::new(m), Err(_) => { panic!( @@ -67,9 +70,10 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 0, + send_from_block, msg_gather_fut: None, msg_send_fut: None, + messaging_config: Arc::new(RwLock::new(config)), }) } @@ -218,12 +222,15 @@ impl Stream for MessagingService { } } + let mut messaging_config = pin.messaging_config.write().unwrap(); // Poll the gathering future if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); pin.gather_from_block = last_block + 1; + messaging_config.gather_from_block = pin.gather_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -243,6 +250,9 @@ impl Stream for MessagingService { Poll::Ready(Ok(Some((block_num, msg_count)))) => { info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num); pin.send_from_block = block_num + 1; + // update the config with the latest block number sent. + messaging_config.send_from_block = pin.send_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 6375f78227..4854722728 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use crate::hooker::KatanaHooker; use anyhow::Result; use async_trait::async_trait; @@ -69,6 +70,52 @@ impl StarknetMessaging { }) } + /// Fetches events for the given blocks range. + pub async fn fetch_events( + &self, + from_block: BlockId, + to_block: BlockId, + ) -> Result>> { + trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); + + let mut block_to_events: HashMap> = HashMap::new(); + + let filter = EventFilter { + from_block: Some(from_block), + to_block: Some(to_block), + address: Some(self.messaging_contract_address), + // TODO: this might come from the configuration actually. + keys: None, + }; + + // TODO: this chunk_size may also come from configuration? + let chunk_size = 200; + let mut continuation_token: Option = None; + + loop { + let event_page = + self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?; + + event_page.events.into_iter().for_each(|event| { + // We ignore events without the block number + if let Some(block_number) = event.block_number { + block_to_events + .entry(block_number) + .and_modify(|v| v.push(event.clone())) + .or_insert(vec![event]); + } + }); + + continuation_token = event_page.continuation_token; + + if continuation_token.is_none() { + break; + } + } + + Ok(block_to_events) + } + async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult> { let mut l1_handler_txs: Vec = vec![]; let mut continuation_token: Option = None; @@ -245,6 +292,41 @@ impl Messenger for StarknetM } }; + if from_block > chain_latest_block { + // Nothing to fetch, we can skip waiting the next tick. + return Ok((chain_latest_block, vec![])); + } + + // +1 as the from_block counts as 1 block fetched. + let to_block = if from_block + max_blocks + 1 < chain_latest_block { + from_block + max_blocks + } else { + chain_latest_block + }; + + let mut l1_handler_txs: Vec = vec![]; + + // fetch events for the given range before fetching pending events + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + .await + .map_err(|_| Error::SendError) + .unwrap() + .iter() + .for_each(|(block_number, block_events)| { + debug!( + target: LOG_TARGET, + block_number = %block_number, + events_count = %block_events.len(), + "Converting events of block into L1HandlerTx." + ); + + block_events.iter().for_each(|e| { + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + l1_handler_txs.push(tx) + } + }) + }); + // Check if the block number has changed let previous_block = self.latest_block.load(Ordering::Relaxed); if previous_block != chain_latest_block { @@ -252,11 +334,13 @@ impl Messenger for StarknetM self.event_cache.write().await.clear(); self.latest_block.store(chain_latest_block, Ordering::Relaxed); } - + // Fetch pending events let pending_txs = self.fetch_pending_events(chain_id).await?; - debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len()); + // Add pending events to the list + l1_handler_txs.extend(pending_txs); - Ok((chain_latest_block, pending_txs)) + debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len()); + Ok((chain_latest_block, l1_handler_txs)) } async fn send_messages(