Skip to content

Commit

Permalink
feat(katana): prevent sequencer from sending all block multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
ybensacq committed Aug 6, 2024
1 parent 3e91de3 commit c3c91ca
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
31 changes: 27 additions & 4 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ mod service;
mod starknet;

use std::path::Path;
use std::fs::File;
use std::io::Write;

use ::starknet::providers::ProviderError as StarknetProviderError;
use alloy_transport::TransportError;
Expand All @@ -49,7 +51,7 @@ use async_trait::async_trait;
use ethereum::EthereumMessaging;
use katana_primitives::chain::ChainId;
use katana_primitives::receipt::MessageToL1;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tracing::{error, info};

pub use self::service::{MessagingOutcome, MessagingService};
Expand Down Expand Up @@ -90,7 +92,7 @@ impl From<TransportError> for Error {
}

/// The config used to initialize the messaging service.
#[derive(Debug, Default, Deserialize, Clone)]
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct MessagingConfig {
/// The settlement chain.
pub chain: String,
Expand All @@ -107,7 +109,11 @@ pub struct MessagingConfig {
/// from/to the settlement chain.
pub interval: u64,
/// The block on settlement chain from where Katana will start fetching messages.
pub from_block: u64,
pub gather_from_block: u64,
/// The block from where sequencer wil start sending messages.
pub send_from_block: u64,
/// Path to the config file.
pub config_file: String,
}

impl MessagingConfig {
Expand All @@ -119,7 +125,24 @@ impl MessagingConfig {

/// This is used as the clap `value_parser` implementation
pub fn parse(path: &str) -> Result<Self, String> {
Self::load(path).map_err(|e| e.to_string())
let mut config = Self::load(path).map_err(|e| e.to_string())?;
config.config_file = path.to_string();
config.save().map_err(|e| e.to_string())?;
Ok(config)
}

/// Save the config to a JSON file.
pub fn save(&self) -> Result<(), std::io::Error> {
if self.config_file.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Config file path is empty",
));
}
let json = serde_json::to_string_pretty(self)?;
let mut file = File::create(&self.config_file)?;
file.write_all(json.as_bytes())?;
Ok(())
}
}

Expand Down
21 changes: 13 additions & 8 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash};
use katana_provider::traits::block::BlockNumberProvider;
use katana_provider::traits::transaction::ReceiptProvider;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{interval_at, Instant, Interval};
Expand Down Expand Up @@ -38,6 +38,8 @@ pub struct MessagingService<EF: ExecutorFactory> {
send_from_block: u64,
/// The message sending future.
msg_send_fut: Option<MessageSettlingFuture>,
/// The messaging configuration.
messaging_config: Arc<RwLock<MessagingConfig>>,
}

impl<EF: ExecutorFactory> MessagingService<EF> {
Expand All @@ -49,9 +51,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
backend: Arc<Backend<EF>>,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
) -> anyhow::Result<Self> {
let gather_from_block = config.from_block;
let gather_from_block = config.gather_from_block;
let send_from_block = config.send_from_block;
let interval = interval_from_seconds(config.interval);
let messenger = match MessengerMode::from_config(config, hooker).await {
let messenger = match MessengerMode::from_config(config.clone(), hooker).await {
Ok(m) => Arc::new(m),
Err(_) => {
panic!(
Expand All @@ -67,9 +70,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
interval,
messenger,
gather_from_block,
send_from_block: 60,
send_from_block,
msg_gather_fut: None,
msg_send_fut: None,
messaging_config: Arc::new(RwLock::new(config)),
})
}

Expand Down Expand Up @@ -196,9 +200,6 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
let pin = self.get_mut();

if pin.interval.poll_tick(cx).is_ready() {
println!("MessagingService::poll_next");
println!("pin.msg_gather_fut.is_none() = {:?}", pin.msg_gather_fut.is_none());
println!("pin.msg_send_fut.gather_from_block() = {:?}", pin.gather_from_block);
if pin.msg_gather_fut.is_none() {
pin.msg_gather_fut = Some(Box::pin(Self::gather_messages(
pin.messenger.clone(),
Expand All @@ -225,7 +226,7 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
if let Some(mut gather_fut) = pin.msg_gather_fut.take() {
match gather_fut.poll_unpin(cx) {
Poll::Ready(Ok((last_block, msg_count))) => {
info!(target: LOG_TARGET, "Gathered {} transactions up to block yb {}", msg_count, last_block);
info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block);
pin.gather_from_block = last_block + 1;
return Poll::Ready(Some(MessagingOutcome::Gather {
lastest_block: last_block,
Expand All @@ -246,6 +247,10 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
Poll::Ready(Ok(Some((block_num, msg_count)))) => {
info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num);
pin.send_from_block = block_num + 1;
// update the config with the latest block number sent.
let mut messaging_config = pin.messaging_config.write().unwrap();
messaging_config.send_from_block = pin.send_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count }));
}
Poll::Ready(Err(e)) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM

async fn gather_messages(
&self,
from_block: u64,
max_blocks: u64,
_from_block: u64,
_max_blocks: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<L1HandlerTx>)> {
debug!(target: LOG_TARGET, "Gathering messages");
Expand Down

0 comments on commit c3c91ca

Please sign in to comment.