diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index 259cb483f1..b7b7b8a193 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -41,6 +41,8 @@ mod service; mod starknet; use std::path::Path; +use std::fs::File; +use std::io::Write; use ::starknet::providers::ProviderError as StarknetProviderError; use alloy_transport::TransportError; @@ -49,7 +51,7 @@ use async_trait::async_trait; use ethereum::EthereumMessaging; use katana_primitives::chain::ChainId; use katana_primitives::receipt::MessageToL1; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::{error, info}; pub use self::service::{MessagingOutcome, MessagingService}; @@ -90,7 +92,7 @@ impl From for Error { } /// The config used to initialize the messaging service. -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Default, Deserialize, Clone, Serialize)] pub struct MessagingConfig { /// The settlement chain. pub chain: String, @@ -107,7 +109,11 @@ pub struct MessagingConfig { /// from/to the settlement chain. pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. - pub from_block: u64, + pub gather_from_block: u64, + /// The block from where sequencer wil start sending messages. + pub send_from_block: u64, + /// Path to the config file. + pub config_file: String, } impl MessagingConfig { @@ -119,7 +125,24 @@ impl MessagingConfig { /// This is used as the clap `value_parser` implementation pub fn parse(path: &str) -> Result { - Self::load(path).map_err(|e| e.to_string()) + let mut config = Self::load(path).map_err(|e| e.to_string())?; + config.config_file = path.to_string(); + config.save().map_err(|e| e.to_string())?; + Ok(config) + } + + /// Save the config to a JSON file. + pub fn save(&self) -> Result<(), std::io::Error> { + if self.config_file.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Config file path is empty", + )); + } + let json = serde_json::to_string_pretty(self)?; + let mut file = File::create(&self.config_file)?; + file.write_all(json.as_bytes())?; + Ok(()) } } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 375a4fb465..a524e41f32 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; use katana_provider::traits::transaction::ReceiptProvider; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use std::time::Duration; use tokio::time::{interval_at, Instant, Interval}; @@ -38,6 +38,8 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The messaging configuration. + messaging_config: Arc>, } impl MessagingService { @@ -49,9 +51,10 @@ impl MessagingService { backend: Arc>, hooker: Arc + Send + Sync>>, ) -> anyhow::Result { - let gather_from_block = config.from_block; + let gather_from_block = config.gather_from_block; + let send_from_block = config.send_from_block; let interval = interval_from_seconds(config.interval); - let messenger = match MessengerMode::from_config(config, hooker).await { + let messenger = match MessengerMode::from_config(config.clone(), hooker).await { Ok(m) => Arc::new(m), Err(_) => { panic!( @@ -67,9 +70,10 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 60, + send_from_block, msg_gather_fut: None, msg_send_fut: None, + messaging_config: Arc::new(RwLock::new(config)), }) } @@ -196,9 +200,6 @@ impl Stream for MessagingService { let pin = self.get_mut(); if pin.interval.poll_tick(cx).is_ready() { - println!("MessagingService::poll_next"); - println!("pin.msg_gather_fut.is_none() = {:?}", pin.msg_gather_fut.is_none()); - println!("pin.msg_send_fut.gather_from_block() = {:?}", pin.gather_from_block); if pin.msg_gather_fut.is_none() { pin.msg_gather_fut = Some(Box::pin(Self::gather_messages( pin.messenger.clone(), @@ -225,7 +226,7 @@ impl Stream for MessagingService { if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { - info!(target: LOG_TARGET, "Gathered {} transactions up to block yb {}", msg_count, last_block); + info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); pin.gather_from_block = last_block + 1; return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, @@ -246,6 +247,10 @@ impl Stream for MessagingService { Poll::Ready(Ok(Some((block_num, msg_count)))) => { info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num); pin.send_from_block = block_num + 1; + // update the config with the latest block number sent. + let mut messaging_config = pin.messaging_config.write().unwrap(); + messaging_config.send_from_block = pin.send_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 6375f78227..5473f3bf47 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -225,8 +225,8 @@ impl Messenger for StarknetM async fn gather_messages( &self, - from_block: u64, - max_blocks: u64, + _from_block: u64, + _max_blocks: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { debug!(target: LOG_TARGET, "Gathering messages");