Skip to content

Commit

Permalink
feat(katana): prevent sequencer to treat all block from start (#22)
Browse files Browse the repository at this point in the history
* fix(solis): auth solis

* update send_from_block

* feat(katana): prevent sequencer from sending all block multiple times

* starknet messenging: gather message & update gather from in config file
  • Loading branch information
ybensacq authored Aug 22, 2024
1 parent de1c664 commit a4b4276
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 11 deletions.
31 changes: 27 additions & 4 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ mod service;
mod starknet;

use std::path::Path;
use std::fs::File;
use std::io::Write;

use ::starknet::providers::ProviderError as StarknetProviderError;
use alloy_transport::TransportError;
Expand All @@ -49,7 +51,7 @@ use async_trait::async_trait;
use ethereum::EthereumMessaging;
use katana_primitives::chain::ChainId;
use katana_primitives::receipt::MessageToL1;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tracing::{error, info};

pub use self::service::{MessagingOutcome, MessagingService};
Expand Down Expand Up @@ -90,7 +92,7 @@ impl From<TransportError> for Error {
}

/// The config used to initialize the messaging service.
#[derive(Debug, Default, Deserialize, Clone)]
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct MessagingConfig {
/// The settlement chain.
pub chain: String,
Expand All @@ -107,7 +109,11 @@ pub struct MessagingConfig {
/// from/to the settlement chain.
pub interval: u64,
/// The block on settlement chain from where Katana will start fetching messages.
pub from_block: u64,
pub gather_from_block: u64,
/// The block from where sequencer wil start sending messages.
pub send_from_block: u64,
/// Path to the config file.
pub config_file: String,
}

impl MessagingConfig {
Expand All @@ -119,7 +125,24 @@ impl MessagingConfig {

/// This is used as the clap `value_parser` implementation
pub fn parse(path: &str) -> Result<Self, String> {
Self::load(path).map_err(|e| e.to_string())
let mut config = Self::load(path).map_err(|e| e.to_string())?;
config.config_file = path.to_string();
config.save().map_err(|e| e.to_string())?;
Ok(config)
}

/// Save the config to a JSON file.
pub fn save(&self) -> Result<(), std::io::Error> {
if self.config_file.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Config file path is empty",
));
}
let json = serde_json::to_string_pretty(self)?;
let mut file = File::create(&self.config_file)?;
file.write_all(json.as_bytes())?;
Ok(())
}
}

Expand Down
18 changes: 14 additions & 4 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash};
use katana_provider::traits::block::BlockNumberProvider;
use katana_provider::traits::transaction::ReceiptProvider;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{interval_at, Instant, Interval};
Expand Down Expand Up @@ -38,6 +38,8 @@ pub struct MessagingService<EF: ExecutorFactory> {
send_from_block: u64,
/// The message sending future.
msg_send_fut: Option<MessageSettlingFuture>,
/// The messaging configuration.
messaging_config: Arc<RwLock<MessagingConfig>>,
}

impl<EF: ExecutorFactory> MessagingService<EF> {
Expand All @@ -49,9 +51,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
backend: Arc<Backend<EF>>,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
) -> anyhow::Result<Self> {
let gather_from_block = config.from_block;
let gather_from_block = config.gather_from_block;
let send_from_block = config.send_from_block;
let interval = interval_from_seconds(config.interval);
let messenger = match MessengerMode::from_config(config, hooker).await {
let messenger = match MessengerMode::from_config(config.clone(), hooker).await {
Ok(m) => Arc::new(m),
Err(_) => {
panic!(
Expand All @@ -67,9 +70,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
interval,
messenger,
gather_from_block,
send_from_block: 0,
send_from_block,
msg_gather_fut: None,
msg_send_fut: None,
messaging_config: Arc::new(RwLock::new(config)),
})
}

Expand Down Expand Up @@ -218,12 +222,15 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
}
}

let mut messaging_config = pin.messaging_config.write().unwrap();
// Poll the gathering future
if let Some(mut gather_fut) = pin.msg_gather_fut.take() {
match gather_fut.poll_unpin(cx) {
Poll::Ready(Ok((last_block, msg_count))) => {
info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block);
pin.gather_from_block = last_block + 1;
messaging_config.gather_from_block = pin.gather_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Gather {
lastest_block: last_block,
msg_count,
Expand All @@ -243,6 +250,9 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
Poll::Ready(Ok(Some((block_num, msg_count)))) => {
info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num);
pin.send_from_block = block_num + 1;
// update the config with the latest block number sent.
messaging_config.send_from_block = pin.send_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count }));
}
Poll::Ready(Err(e)) => {
Expand Down
90 changes: 87 additions & 3 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use crate::hooker::KatanaHooker;
use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -69,6 +70,52 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
})
}

/// Fetches events for the given blocks range.
pub async fn fetch_events(
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();

let filter = EventFilter {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

loop {
let event_page =
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
}
});

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

Ok(block_to_events)
}

async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;
Expand Down Expand Up @@ -245,18 +292,55 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
}

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

// fetch events for the given range before fetching pending events
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
})
});

// Check if the block number has changed
let previous_block = self.latest_block.load(Ordering::Relaxed);
if previous_block != chain_latest_block {
debug!(target: LOG_TARGET, "Block number changed from {} to {}, clearing cache", previous_block, chain_latest_block);
self.event_cache.write().await.clear();
self.latest_block.store(chain_latest_block, Ordering::Relaxed);
}

// Fetch pending events
let pending_txs = self.fetch_pending_events(chain_id).await?;
debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len());
// Add pending events to the list
l1_handler_txs.extend(pending_txs);

Ok((chain_latest_block, pending_txs))
debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len());
Ok((chain_latest_block, l1_handler_txs))
}

async fn send_messages(
Expand Down

0 comments on commit a4b4276

Please sign in to comment.