From 194438922a028dac6bf40533ce139dd742809902 Mon Sep 17 00:00:00 2001 From: ADR!AN <111903096+adrianvrj@users.noreply.github.com> Date: Sat, 24 Aug 2024 20:33:24 -0600 Subject: [PATCH] fix(katana): ensure messages are ordered (#2326) * [fix] converted hash map to ordered vec in fetch events func * [fix] Orthographic error in comment * [fix] run fmt * [fix] returning a single Vec * [fix] returning a Vec for ethereum.rs fetch_logs * [fix] debugger * [fix] using compiler inference for Vecs * Update crates/katana/core/src/service/messaging/starknet.rs Co-authored-by: glihm --------- Co-authored-by: glihm --- .../core/src/service/messaging/ethereum.rs | 46 +++++++------------ .../core/src/service/messaging/starknet.rs | 32 ++++++------- 2 files changed, 29 insertions(+), 49 deletions(-) diff --git a/crates/katana/core/src/service/messaging/ethereum.rs b/crates/katana/core/src/service/messaging/ethereum.rs index 6b4c5556eb..9ab91757e4 100644 --- a/crates/katana/core/src/service/messaging/ethereum.rs +++ b/crates/katana/core/src/service/messaging/ethereum.rs @@ -1,6 +1,5 @@ #![allow(dead_code)] -use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -72,14 +71,10 @@ impl EthereumMessaging { /// /// * `from_block` - The first block of which logs must be fetched. /// * `to_block` - The last block of which logs must be fetched. - pub async fn fetch_logs( - &self, - from_block: u64, - to_block: u64, - ) -> MessengerResult>> { + pub async fn fetch_logs(&self, from_block: u64, to_block: u64) -> MessengerResult> { trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); - let mut block_to_logs: HashMap> = HashMap::new(); + let mut logs = vec![]; let filters = Filter { block_option: FilterBlockOption::Range { @@ -107,15 +102,11 @@ impl EthereumMessaging { .await? .into_iter() .filter(|log| log.block_number.is_some()) - .map(|log| (log.block_number.unwrap(), log)) - .for_each(|(block_num, log)| { - block_to_logs - .entry(block_num) - .and_modify(|v| v.push(log.clone())) - .or_insert(vec![log]); + .for_each(|log| { + logs.push(log); }); - Ok(block_to_logs) + Ok(logs) } } @@ -143,22 +134,17 @@ impl Messenger for EthereumMessaging { let mut l1_handler_txs = vec![]; trace!(target: LOG_TARGET, from_block, to_block, "Fetching logs from {from_block} to {to_block}."); - self.fetch_logs(from_block, to_block).await?.into_iter().for_each( - |(block_number, block_logs)| { - debug!( - target: LOG_TARGET, - block_number = %block_number, - logs_found = %block_logs.len(), - "Converting logs into L1HandlerTx.", - ); - - block_logs.into_iter().for_each(|log| { - if let Ok(tx) = l1_handler_tx_from_log(log, chain_id) { - l1_handler_txs.push(tx) - } - }) - }, - ); + self.fetch_logs(from_block, to_block).await?.iter().for_each(|l| { + debug!( + target: LOG_TARGET, + log = ?l, + "Converting log into L1HandlerTx.", + ); + + if let Ok(tx) = l1_handler_tx_from_log(l.clone(), chain_id) { + l1_handler_txs.push(tx) + } + }); Ok((to_block, l1_handler_txs)) } diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index d99406058b..fe87fce0b6 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::Arc; use anyhow::Result; @@ -61,15 +60,14 @@ impl StarknetMessaging { }) } - /// Fetches events for the given blocks range. pub async fn fetch_events( &self, from_block: BlockId, to_block: BlockId, - ) -> Result>> { + ) -> Result> { trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); - let mut block_to_events: HashMap> = HashMap::new(); + let mut events = vec![]; let filter = EventFilter { from_block: Some(from_block), @@ -89,11 +87,10 @@ impl StarknetMessaging { event_page.events.into_iter().for_each(|event| { // We ignore events without the block number - if let Some(block_number) = event.block_number { - block_to_events - .entry(block_number) - .and_modify(|v| v.push(event.clone())) - .or_insert(vec![event]); + if event.block_number.is_some() { + // Blocks are processed in order as retrieved by `get_events`. + // This way we keep the order and ensure the messages are executed in order. + events.push(event); } }); @@ -104,7 +101,7 @@ impl StarknetMessaging { } } - Ok(block_to_events) + Ok(events) } /// Sends an invoke TX on starknet. @@ -201,19 +198,16 @@ impl Messenger for StarknetMessaging { .map_err(|_| Error::SendError) .unwrap() .iter() - .for_each(|(block_number, block_events)| { + .for_each(|e| { debug!( target: LOG_TARGET, - block_number = %block_number, - events_count = %block_events.len(), - "Converting events of block into L1HandlerTx." + event = ?e, + "Converting event into L1HandlerTx." ); - block_events.iter().for_each(|e| { - if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { - l1_handler_txs.push(tx) - } - }) + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + l1_handler_txs.push(tx) + } }); Ok((to_block, l1_handler_txs))