Skip to content

Commit

Permalink
fix(katana): ensure messages are ordered (#2326)
Browse files Browse the repository at this point in the history
* [fix] converted hash map to ordered vec in fetch events func

* [fix] Orthographic error in comment

* [fix] run fmt

* [fix] returning a single Vec<EmittedEvent>

* [fix] returning a Vec<Log> for ethereum.rs  fetch_logs

* [fix] debugger

* [fix] using compiler inference for Vecs

* Update crates/katana/core/src/service/messaging/starknet.rs

Co-authored-by: glihm <[email protected]>

---------

Co-authored-by: glihm <[email protected]>
  • Loading branch information
adrianvrj and glihm authored Aug 25, 2024
1 parent f11b3a6 commit 1944389
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 49 deletions.
46 changes: 16 additions & 30 deletions crates/katana/core/src/service/messaging/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(dead_code)]

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -72,14 +71,10 @@ impl EthereumMessaging {
///
/// * `from_block` - The first block of which logs must be fetched.
/// * `to_block` - The last block of which logs must be fetched.
pub async fn fetch_logs(
&self,
from_block: u64,
to_block: u64,
) -> MessengerResult<HashMap<u64, Vec<Log>>> {
pub async fn fetch_logs(&self, from_block: u64, to_block: u64) -> MessengerResult<Vec<Log>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_logs: HashMap<u64, Vec<Log>> = HashMap::new();
let mut logs = vec![];

let filters = Filter {
block_option: FilterBlockOption::Range {
Expand Down Expand Up @@ -107,15 +102,11 @@ impl EthereumMessaging {
.await?
.into_iter()
.filter(|log| log.block_number.is_some())
.map(|log| (log.block_number.unwrap(), log))
.for_each(|(block_num, log)| {
block_to_logs
.entry(block_num)
.and_modify(|v| v.push(log.clone()))
.or_insert(vec![log]);
.for_each(|log| {
logs.push(log);
});

Ok(block_to_logs)
Ok(logs)
}
}

Expand Down Expand Up @@ -143,22 +134,17 @@ impl Messenger for EthereumMessaging {
let mut l1_handler_txs = vec![];

trace!(target: LOG_TARGET, from_block, to_block, "Fetching logs from {from_block} to {to_block}.");
self.fetch_logs(from_block, to_block).await?.into_iter().for_each(
|(block_number, block_logs)| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
logs_found = %block_logs.len(),
"Converting logs into L1HandlerTx.",
);

block_logs.into_iter().for_each(|log| {
if let Ok(tx) = l1_handler_tx_from_log(log, chain_id) {
l1_handler_txs.push(tx)
}
})
},
);
self.fetch_logs(from_block, to_block).await?.iter().for_each(|l| {
debug!(
target: LOG_TARGET,
log = ?l,
"Converting log into L1HandlerTx.",
);

if let Ok(tx) = l1_handler_tx_from_log(l.clone(), chain_id) {
l1_handler_txs.push(tx)
}
});

Ok((to_block, l1_handler_txs))
}
Expand Down
32 changes: 13 additions & 19 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -61,15 +60,14 @@ impl StarknetMessaging {
})
}

/// Fetches events for the given blocks range.
pub async fn fetch_events(
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
) -> Result<Vec<EmittedEvent>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();
let mut events = vec![];

let filter = EventFilter {
from_block: Some(from_block),
Expand All @@ -89,11 +87,10 @@ impl StarknetMessaging {

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
if event.block_number.is_some() {
// Blocks are processed in order as retrieved by `get_events`.
// This way we keep the order and ensure the messages are executed in order.
events.push(event);
}
});

Expand All @@ -104,7 +101,7 @@ impl StarknetMessaging {
}
}

Ok(block_to_events)
Ok(events)
}

/// Sends an invoke TX on starknet.
Expand Down Expand Up @@ -201,19 +198,16 @@ impl Messenger for StarknetMessaging {
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
.for_each(|e| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
event = ?e,
"Converting event into L1HandlerTx."
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
})
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
});

Ok((to_block, l1_handler_txs))
Expand Down

0 comments on commit 1944389

Please sign in to comment.