Skip to content

Commit

Permalink
fix: katana message gathering
Browse files Browse the repository at this point in the history
  • Loading branch information
kwiss committed Oct 22, 2024
1 parent a4b4276 commit cd725f9
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 86 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ crates/benches/bench_results.txt
.vscode
bindings
./bin/solis/.env

messaging.local.json
addresses.json
existing-katana-db
14 changes: 12 additions & 2 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
// 200 avoids any possible rejection from RPC with possibly lots of messages.
// TODO: May this be configurable?
let max_block = 200;

info!(
target: LOG_TARGET,
"Starting gather_messages from block {} with max_block {}",
from_block,
max_block
);
match messenger.as_ref() {
MessengerMode::Ethereum(inner) => {
let (block_num, txs) =
Expand All @@ -113,7 +118,12 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
trace_l1_handler_tx_exec(hash, &tx);
pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() })
});

info!(
target: LOG_TARGET,
"gather_messages finished. Last block: {}, tx count: {}",
block_num,
txs_count
);
Ok((block_num, txs_count))
}
}
Expand Down
191 changes: 116 additions & 75 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;

loop {
debug!(target: LOG_TARGET, "Fetching pending events with continuation token: {:?}", continuation_token);

let filter = EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: Some(self.messaging_contract_address),
keys: None,
};

let event_page = self
.provider
.get_events(filter.clone(), continuation_token.clone(), 200)
Expand All @@ -138,55 +138,47 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
Error::SendError
})?;

debug!(target: LOG_TARGET, "Fetched {} events", event_page.events.len());

debug!(target: LOG_TARGET, "Fetched {} pending events", event_page.events.len());
for event in event_page.events {
let event_id = event.transaction_hash.to_string();
debug!(target: LOG_TARGET, "Processing event with ID: {}", event_id);

{
let cache = self.event_cache.read().await;
if cache.contains(&event_id) {
debug!(target: LOG_TARGET, "Event ID: {} already processed, skipping", event_id);
continue;
}

// Check if we've already processed this event
let cache = self.event_cache.read().await;
if cache.contains(&event_id) {
debug!(target: LOG_TARGET, "Pending event {} already processed, skipping", event_id);
continue;
}

drop(cache);

if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(&event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;
if is_message_accepted {
debug!(target: LOG_TARGET, "Event ID: {} accepted, adding to transactions", event_id);
debug!(target: LOG_TARGET, "Pending event {} accepted", event_id);
l1_handler_txs.push(tx);

// Add to cache after processing
let mut cache = self.event_cache.write().await;
cache.insert(event_id);
} else {
debug!(
target: LOG_TARGET,
"Event ID: {} not accepted by hooker, check the contract addresses defined in the hooker: executor address: {:?}, orderbook address: {:?}",
event_id,
from,
to
);
}
}
}
}

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

debug!(target: LOG_TARGET, "Total transactions gathered: {}", l1_handler_txs.len());
debug!(target: LOG_TARGET, "Total pending transactions gathered: {}", l1_handler_txs.len());
Ok(l1_handler_txs)
}

Expand Down Expand Up @@ -276,8 +268,9 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
max_blocks: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<L1HandlerTx>)> {
debug!(target: LOG_TARGET, "Gathering messages");

debug!(target: LOG_TARGET, "Starting gather_messages with from_block: {}, max_blocks: {}", from_block, max_blocks);

// First get the latest block number
let chain_latest_block: u64 = match self.provider.block_number().await {
Ok(n) => {
debug!(target: LOG_TARGET, "Latest block number on chain: {}", n);
Expand All @@ -291,56 +284,104 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
return Err(Error::SendError);
}
};


let mut l1_handler_txs = Vec::new();

// First gather pending events
debug!(target: LOG_TARGET, "Fetching pending events first");
let pending_txs = self.fetch_pending_events(chain_id).await?;
l1_handler_txs.extend(pending_txs);
debug!(target: LOG_TARGET, "Found {} pending transactions", l1_handler_txs.len());

// Then check if we need to process blocks
if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
debug!(target: LOG_TARGET, "from_block ({}) > chain_latest_block ({}), returning with only pending events", from_block, chain_latest_block);
return Ok((chain_latest_block, l1_handler_txs));
}

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
// Calculate the block range to process
let to_block = if from_block + max_blocks < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

// fetch events for the given range before fetching pending events
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
debug!(target: LOG_TARGET, "Fetching confirmed events from block {} to {}", from_block, to_block);

// Now fetch events from confirmed blocks
match self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)).await {
Ok(events_by_block) => {
for (block_number, block_events) in events_by_block.iter() {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Processing confirmed events for block"
);

for event in block_events {
let event_id = event.transaction_hash.to_string();

// Check if we've already processed this event
let cache = self.event_cache.read().await;
if cache.contains(&event_id) {
debug!(target: LOG_TARGET, "Event {} already processed, skipping", event_id);
continue;
}
drop(cache); // Release the read lock

if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
debug!(
target: LOG_TARGET,
"Message from block {} accepted: from {:?} to {:?} with selector {:?}",
block_number, from, to, selector
);
l1_handler_txs.push(tx);

// Add to cache after processing
let mut cache = self.event_cache.write().await;
cache.insert(event_id);
} else {
debug!(
target: LOG_TARGET,
"Message from block {} not accepted: from {:?} to {:?} with selector {:?}",
block_number, from, to, selector
);
}
}
}
}
})
});

// Check if the block number has changed
}
}
Err(e) => {
error!(target: LOG_TARGET, "Error fetching confirmed events: {:?}", e);
return Err(Error::SendError);
}
}

// We only clear the cache if we've moved to a new latest block to avoid reprocessing events
let previous_block = self.latest_block.load(Ordering::Relaxed);
if previous_block != chain_latest_block {
debug!(target: LOG_TARGET, "Block number changed from {} to {}, clearing cache", previous_block, chain_latest_block);
if previous_block < chain_latest_block {
debug!(target: LOG_TARGET, "Moving to new latest block {} from {}, clearing cache", chain_latest_block, previous_block);
self.event_cache.write().await.clear();
self.latest_block.store(chain_latest_block, Ordering::Relaxed);
}
// Fetch pending events
let pending_txs = self.fetch_pending_events(chain_id).await?;
// Add pending events to the list
l1_handler_txs.extend(pending_txs);

debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len());
Ok((chain_latest_block, l1_handler_txs))

info!(
target: LOG_TARGET,
"Total messages gathered: {} (including pending)",
l1_handler_txs.len(),
);

Ok((to_block, l1_handler_txs))
}

async fn send_messages(
Expand Down
9 changes: 0 additions & 9 deletions messaging.local.json

This file was deleted.

0 comments on commit cd725f9

Please sign in to comment.