From cd725f94e40d7d8913b138289bcb4e3de082e61c Mon Sep 17 00:00:00 2001 From: kwiss Date: Tue, 22 Oct 2024 17:56:44 +0200 Subject: [PATCH] fix: katana message gathering --- .gitignore | 4 + .../core/src/service/messaging/service.rs | 14 +- .../core/src/service/messaging/starknet.rs | 191 +++++++++++------- messaging.local.json | 9 - 4 files changed, 132 insertions(+), 86 deletions(-) delete mode 100644 messaging.local.json diff --git a/.gitignore b/.gitignore index f65de7119d..4632fee075 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,7 @@ crates/benches/bench_results.txt .vscode bindings ./bin/solis/.env + +messaging.local.json +addresses.json +existing-katana-db \ No newline at end of file diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 86333cac11..b61f8001ab 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -86,7 +86,12 @@ impl MessagingService { // 200 avoids any possible rejection from RPC with possibly lots of messages. // TODO: May this be configurable? let max_block = 200; - + info!( + target: LOG_TARGET, + "Starting gather_messages from block {} with max_block {}", + from_block, + max_block + ); match messenger.as_ref() { MessengerMode::Ethereum(inner) => { let (block_num, txs) = @@ -113,7 +118,12 @@ impl MessagingService { trace_l1_handler_tx_exec(hash, &tx); pool.add_transaction(ExecutableTxWithHash { hash, transaction: tx.into() }) }); - + info!( + target: LOG_TARGET, + "gather_messages finished. Last block: {}, tx count: {}", + block_num, + txs_count + ); Ok((block_num, txs_count)) } } diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 4854722728..8e96155756 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -119,17 +119,17 @@ impl StarknetMessaging { async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult> { let mut l1_handler_txs: Vec = vec![]; let mut continuation_token: Option = None; - + loop { debug!(target: LOG_TARGET, "Fetching pending events with continuation token: {:?}", continuation_token); - + let filter = EventFilter { from_block: Some(BlockId::Tag(BlockTag::Pending)), to_block: Some(BlockId::Tag(BlockTag::Pending)), address: Some(self.messaging_contract_address), keys: None, }; - + let event_page = self .provider .get_events(filter.clone(), continuation_token.clone(), 200) @@ -138,55 +138,47 @@ impl StarknetMessaging { error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e); Error::SendError })?; - - debug!(target: LOG_TARGET, "Fetched {} events", event_page.events.len()); - + + debug!(target: LOG_TARGET, "Fetched {} pending events", event_page.events.len()); + for event in event_page.events { let event_id = event.transaction_hash.to_string(); - debug!(target: LOG_TARGET, "Processing event with ID: {}", event_id); - - { - let cache = self.event_cache.read().await; - if cache.contains(&event_id) { - debug!(target: LOG_TARGET, "Event ID: {} already processed, skipping", event_id); - continue; - } + + // Check if we've already processed this event + let cache = self.event_cache.read().await; + if cache.contains(&event_id) { + debug!(target: LOG_TARGET, "Pending event {} already processed, skipping", event_id); + continue; } - + drop(cache); + if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) { if let Ok((from, to, selector)) = info_from_event(&event) { - let hooker = Arc::clone(&self.hooker); - let is_message_accepted = hooker - .read() - .await - .verify_message_to_appchain(from, to, selector) - .await; + let hooker = Arc::clone(&self.hooker); + let is_message_accepted = hooker + .read() + .await + .verify_message_to_appchain(from, to, selector) + .await; if is_message_accepted { - debug!(target: LOG_TARGET, "Event ID: {} accepted, adding to transactions", event_id); + debug!(target: LOG_TARGET, "Pending event {} accepted", event_id); l1_handler_txs.push(tx); + + // Add to cache after processing let mut cache = self.event_cache.write().await; cache.insert(event_id); - } else { - debug!( - target: LOG_TARGET, - "Event ID: {} not accepted by hooker, check the contract addresses defined in the hooker: executor address: {:?}, orderbook address: {:?}", - event_id, - from, - to - ); } } } } - + continuation_token = event_page.continuation_token; - if continuation_token.is_none() { break; } } - - debug!(target: LOG_TARGET, "Total transactions gathered: {}", l1_handler_txs.len()); + + debug!(target: LOG_TARGET, "Total pending transactions gathered: {}", l1_handler_txs.len()); Ok(l1_handler_txs) } @@ -276,8 +268,9 @@ impl Messenger for StarknetM max_blocks: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { - debug!(target: LOG_TARGET, "Gathering messages"); - + debug!(target: LOG_TARGET, "Starting gather_messages with from_block: {}, max_blocks: {}", from_block, max_blocks); + + // First get the latest block number let chain_latest_block: u64 = match self.provider.block_number().await { Ok(n) => { debug!(target: LOG_TARGET, "Latest block number on chain: {}", n); @@ -291,56 +284,104 @@ impl Messenger for StarknetM return Err(Error::SendError); } }; - + + let mut l1_handler_txs = Vec::new(); + + // First gather pending events + debug!(target: LOG_TARGET, "Fetching pending events first"); + let pending_txs = self.fetch_pending_events(chain_id).await?; + l1_handler_txs.extend(pending_txs); + debug!(target: LOG_TARGET, "Found {} pending transactions", l1_handler_txs.len()); + + // Then check if we need to process blocks if from_block > chain_latest_block { - // Nothing to fetch, we can skip waiting the next tick. - return Ok((chain_latest_block, vec![])); + debug!(target: LOG_TARGET, "from_block ({}) > chain_latest_block ({}), returning with only pending events", from_block, chain_latest_block); + return Ok((chain_latest_block, l1_handler_txs)); } - - // +1 as the from_block counts as 1 block fetched. - let to_block = if from_block + max_blocks + 1 < chain_latest_block { + + // Calculate the block range to process + let to_block = if from_block + max_blocks < chain_latest_block { from_block + max_blocks } else { chain_latest_block }; - - let mut l1_handler_txs: Vec = vec![]; - - // fetch events for the given range before fetching pending events - self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) - .await - .map_err(|_| Error::SendError) - .unwrap() - .iter() - .for_each(|(block_number, block_events)| { - debug!( - target: LOG_TARGET, - block_number = %block_number, - events_count = %block_events.len(), - "Converting events of block into L1HandlerTx." - ); - - block_events.iter().for_each(|e| { - if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { - l1_handler_txs.push(tx) + debug!(target: LOG_TARGET, "Fetching confirmed events from block {} to {}", from_block, to_block); + + // Now fetch events from confirmed blocks + match self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)).await { + Ok(events_by_block) => { + for (block_number, block_events) in events_by_block.iter() { + debug!( + target: LOG_TARGET, + block_number = %block_number, + events_count = %block_events.len(), + "Processing confirmed events for block" + ); + + for event in block_events { + let event_id = event.transaction_hash.to_string(); + + // Check if we've already processed this event + let cache = self.event_cache.read().await; + if cache.contains(&event_id) { + debug!(target: LOG_TARGET, "Event {} already processed, skipping", event_id); + continue; + } + drop(cache); // Release the read lock + + if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) { + if let Ok((from, to, selector)) = info_from_event(event) { + let hooker = Arc::clone(&self.hooker); + let is_message_accepted = hooker + .read() + .await + .verify_message_to_appchain(from, to, selector) + .await; + + if is_message_accepted { + debug!( + target: LOG_TARGET, + "Message from block {} accepted: from {:?} to {:?} with selector {:?}", + block_number, from, to, selector + ); + l1_handler_txs.push(tx); + + // Add to cache after processing + let mut cache = self.event_cache.write().await; + cache.insert(event_id); + } else { + debug!( + target: LOG_TARGET, + "Message from block {} not accepted: from {:?} to {:?} with selector {:?}", + block_number, from, to, selector + ); + } + } + } } - }) - }); - - // Check if the block number has changed + } + } + Err(e) => { + error!(target: LOG_TARGET, "Error fetching confirmed events: {:?}", e); + return Err(Error::SendError); + } + } + + // We only clear the cache if we've moved to a new latest block to avoid reprocessing events let previous_block = self.latest_block.load(Ordering::Relaxed); - if previous_block != chain_latest_block { - debug!(target: LOG_TARGET, "Block number changed from {} to {}, clearing cache", previous_block, chain_latest_block); + if previous_block < chain_latest_block { + debug!(target: LOG_TARGET, "Moving to new latest block {} from {}, clearing cache", chain_latest_block, previous_block); self.event_cache.write().await.clear(); self.latest_block.store(chain_latest_block, Ordering::Relaxed); } - // Fetch pending events - let pending_txs = self.fetch_pending_events(chain_id).await?; - // Add pending events to the list - l1_handler_txs.extend(pending_txs); - - debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len()); - Ok((chain_latest_block, l1_handler_txs)) + + info!( + target: LOG_TARGET, + "Total messages gathered: {} (including pending)", + l1_handler_txs.len(), + ); + + Ok((to_block, l1_handler_txs)) } async fn send_messages( diff --git a/messaging.local.json b/messaging.local.json deleted file mode 100644 index 1707e867d9..0000000000 --- a/messaging.local.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "chain": "starknet", - "rpc_url": "http://0.0.0.0:5050", - "contract_address": "0x3757e03517ea83d7ae5714b0bfea853114aeb4d5186ec8bf7ec73ac09033f18", - "sender_address": "0x2d71e9c974539bb3ffb4b115e66a23d0f62a641ea66c4016e903454c8753bbc", - "private_key": "0x33003003001800009900180300d206308b0070db00121318d17b5e6262150b", - "interval": 2, - "from_block": 0 -} \ No newline at end of file