Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(katana): prevent sequencer to treat all block from start #22

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/solis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap_complete.workspace = true
common.workspace = true
console.workspace = true
dojo-metrics.workspace = true
dotenv = "0.15.0"
katana-core.workspace = true
katana-executor.workspace = true
katana-primitives.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion bin/solis/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! and leak detection functionality. See [jemalloc's opt.prof](https://jemalloc.net/jemalloc.3.html#opt.prof)
//! documentation for usage details. This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc)
//! for more info.

use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;

Expand Down
2 changes: 2 additions & 0 deletions bin/solis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub(crate) const LOG_TARGET: &str = "katana::cli";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();

let args = KatanaArgs::parse();
args.init_logging()?;

Expand Down
31 changes: 27 additions & 4 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ mod service;
mod starknet;

use std::path::Path;
use std::fs::File;
use std::io::Write;

use ::starknet::providers::ProviderError as StarknetProviderError;
use alloy_transport::TransportError;
Expand All @@ -49,7 +51,7 @@ use async_trait::async_trait;
use ethereum::EthereumMessaging;
use katana_primitives::chain::ChainId;
use katana_primitives::receipt::MessageToL1;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tracing::{error, info};

pub use self::service::{MessagingOutcome, MessagingService};
Expand Down Expand Up @@ -90,7 +92,7 @@ impl From<TransportError> for Error {
}

/// The config used to initialize the messaging service.
#[derive(Debug, Default, Deserialize, Clone)]
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct MessagingConfig {
/// The settlement chain.
pub chain: String,
Expand All @@ -107,7 +109,11 @@ pub struct MessagingConfig {
/// from/to the settlement chain.
pub interval: u64,
/// The block on settlement chain from where Katana will start fetching messages.
pub from_block: u64,
pub gather_from_block: u64,
/// The block from where sequencer wil start sending messages.
pub send_from_block: u64,
/// Path to the config file.
pub config_file: String,
}

impl MessagingConfig {
Expand All @@ -119,7 +125,24 @@ impl MessagingConfig {

/// This is used as the clap `value_parser` implementation
pub fn parse(path: &str) -> Result<Self, String> {
Self::load(path).map_err(|e| e.to_string())
let mut config = Self::load(path).map_err(|e| e.to_string())?;
config.config_file = path.to_string();
config.save().map_err(|e| e.to_string())?;
Ok(config)
}

/// Save the config to a JSON file.
pub fn save(&self) -> Result<(), std::io::Error> {
if self.config_file.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Config file path is empty",
));
}
let json = serde_json::to_string_pretty(self)?;
let mut file = File::create(&self.config_file)?;
file.write_all(json.as_bytes())?;
Ok(())
}
}

Expand Down
18 changes: 14 additions & 4 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash};
use katana_provider::traits::block::BlockNumberProvider;
use katana_provider::traits::transaction::ReceiptProvider;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{interval_at, Instant, Interval};
Expand Down Expand Up @@ -38,6 +38,8 @@ pub struct MessagingService<EF: ExecutorFactory> {
send_from_block: u64,
/// The message sending future.
msg_send_fut: Option<MessageSettlingFuture>,
/// The messaging configuration.
messaging_config: Arc<RwLock<MessagingConfig>>,
}

impl<EF: ExecutorFactory> MessagingService<EF> {
Expand All @@ -49,9 +51,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
backend: Arc<Backend<EF>>,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
) -> anyhow::Result<Self> {
let gather_from_block = config.from_block;
let gather_from_block = config.gather_from_block;
let send_from_block = config.send_from_block;
let interval = interval_from_seconds(config.interval);
let messenger = match MessengerMode::from_config(config, hooker).await {
let messenger = match MessengerMode::from_config(config.clone(), hooker).await {
Ok(m) => Arc::new(m),
Err(_) => {
panic!(
Expand All @@ -67,9 +70,10 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
interval,
messenger,
gather_from_block,
send_from_block: 0,
send_from_block,
msg_gather_fut: None,
msg_send_fut: None,
messaging_config: Arc::new(RwLock::new(config)),
})
}

Expand Down Expand Up @@ -218,12 +222,15 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
}
}

let mut messaging_config = pin.messaging_config.write().unwrap();
// Poll the gathering future
if let Some(mut gather_fut) = pin.msg_gather_fut.take() {
match gather_fut.poll_unpin(cx) {
Poll::Ready(Ok((last_block, msg_count))) => {
info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block);
pin.gather_from_block = last_block + 1;
messaging_config.gather_from_block = pin.gather_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Gather {
lastest_block: last_block,
msg_count,
Expand All @@ -243,6 +250,9 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
Poll::Ready(Ok(Some((block_num, msg_count)))) => {
info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num);
pin.send_from_block = block_num + 1;
// update the config with the latest block number sent.
messaging_config.send_from_block = pin.send_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count }));
}
Poll::Ready(Err(e)) => {
Expand Down
90 changes: 87 additions & 3 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use crate::hooker::KatanaHooker;
use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -69,6 +70,52 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
})
}

/// Fetches events for the given blocks range.
pub async fn fetch_events(
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();

let filter = EventFilter {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

loop {
let event_page =
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
}
});

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

Ok(block_to_events)
}

async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;
Expand Down Expand Up @@ -245,18 +292,55 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
}

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

// fetch events for the given range before fetching pending events
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
})
});

// Check if the block number has changed
let previous_block = self.latest_block.load(Ordering::Relaxed);
if previous_block != chain_latest_block {
debug!(target: LOG_TARGET, "Block number changed from {} to {}, clearing cache", previous_block, chain_latest_block);
self.event_cache.write().await.clear();
self.latest_block.store(chain_latest_block, Ordering::Relaxed);
}

// Fetch pending events
let pending_txs = self.fetch_pending_events(chain_id).await?;
debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len());
// Add pending events to the list
l1_handler_txs.extend(pending_txs);

Ok((chain_latest_block, pending_txs))
debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len());
Ok((chain_latest_block, l1_handler_txs))
}

async fn send_messages(
Expand Down
Loading