From 2b0468764bfe220545f8dee1fc26c4e97a3c8745 Mon Sep 17 00:00:00 2001 From: Dmitrii Novikov Date: Thu, 12 Dec 2024 19:31:35 +0400 Subject: [PATCH] fixit --- ethexe/ethereum/src/router/events.rs | 13 ++- ethexe/observer/src/observer.rs | 55 +++++++++- ethexe/observer/src/query.rs | 149 ++++++++++++++++++--------- 3 files changed, 163 insertions(+), 54 deletions(-) diff --git a/ethexe/ethereum/src/router/events.rs b/ethexe/ethereum/src/router/events.rs index ce67e48d491..b8c29781673 100644 --- a/ethexe/ethereum/src/router/events.rs +++ b/ethexe/ethereum/src/router/events.rs @@ -16,10 +16,11 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{decode_log, IRouter}; +use crate::{abi::utils::bytes32_to_h256, decode_log, IRouter}; use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent}; use anyhow::{anyhow, Result}; use ethexe_common::events::{RouterEvent, RouterRequestEvent}; +use gprimitives::H256; use signatures::*; pub mod signatures { @@ -89,3 +90,13 @@ pub fn try_extract_request_event(log: &Log) -> Result Ok(Some(request_event)) } + +pub fn try_extract_committed_block_hash(log: &Log) -> Result> { + if log.topic0() != Some(&BLOCK_COMMITTED) { + return Ok(None); + } + + decode_log::(log) + .map(|e| Some(bytes32_to_h256(e.hash))) + .map_err(Into::into) +} diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 910d068fcd0..c08c221ff63 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::watch; /// Max number of blocks to query in alloy. -pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000; +pub(crate) const MAX_QUERY_BLOCK_RANGE: usize = 1024; pub(crate) type ObserverProvider = RootProvider; @@ -309,7 +309,7 @@ pub(crate) async fn read_block_events_batch( let to_block = to_block as u64; while start_block <= to_block { - let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1); + let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1); let filter = Filter::new().from_block(start_block).to_block(end_block); @@ -421,8 +421,9 @@ pub(crate) async fn read_block_request_events_batch( let mut start_block = from_block as u64; let to_block = to_block as u64; + // TODO (breathx): FIX WITHIN PR. to iters. while start_block <= to_block { - let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1); + let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1); let filter = Filter::new().from_block(start_block).to_block(end_block); @@ -512,6 +513,54 @@ async fn read_request_events_impl( Ok(res) } +pub(crate) async fn read_committed_blocks_batch( + from_block: u32, + to_block: u32, + provider: &ObserverProvider, + router_address: AlloyAddress, +) -> Result> { + let mut start_block = from_block as u64; + let to_block = to_block as u64; + + let mut res = Vec::new(); + + while start_block <= to_block { + let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1); + + let filter = Filter::new().from_block(start_block).to_block(end_block); + + let iter_res = read_committed_blocks_impl(router_address, provider, filter).await?; + + res.extend(iter_res); + + start_block = end_block + 1; + } + + Ok(res) +} + +async fn read_committed_blocks_impl( + router_address: AlloyAddress, + provider: &ObserverProvider, + filter: Filter, +) -> Result> { + let filter = filter + .address(router_address) + .event_signature(Topic::from(router::events::signatures::BLOCK_COMMITTED)); + + let logs = provider.get_logs(&filter).await?; + + let mut res = Vec::with_capacity(logs.len()); + + for log in logs { + if let Some(hash) = router::events::try_extract_committed_block_hash(&log)? { + res.push(hash); + } + } + + Ok(res) +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 0f7221bacd8..60fd4bc1c7b 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -1,20 +1,16 @@ -use std::{ - collections::{BTreeSet, HashMap, VecDeque}, - sync::Arc, -}; - use crate::{ observer::{ read_block_events, read_block_request_events, read_block_request_events_batch, - read_code_from_tx_hash, ObserverProvider, + read_code_from_tx_hash, read_committed_blocks_batch, ObserverProvider, + MAX_QUERY_BLOCK_RANGE, }, BlobReader, }; use alloy::{ - eips::BlockNumberOrTag, + network::{Ethereum, Network}, primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder}, - rpc::types::eth::BlockTransactionsKind, + rpc::{client::BatchRequest, types::eth::BlockTransactionsKind}, }; use anyhow::{anyhow, Result}; use ethexe_common::{ @@ -22,7 +18,12 @@ use ethexe_common::{ events::{BlockEvent, BlockRequestEvent, RouterEvent}, }; use ethexe_signer::Address; +use futures::future; use gprimitives::{CodeId, H256}; +use std::{ + collections::{BTreeSet, HashMap, VecDeque}, + sync::Arc, +}; /// Height difference to start fast sync. const DEEP_SYNC: u32 = 10; @@ -95,6 +96,55 @@ impl Query { .collect()) } + async fn batch_get_block_headers( + provider: ObserverProvider, + database: Arc, + from_block: u64, + to_block: u64, + ) -> Result> { + log::debug!("Querying blocks from {from_block} to {to_block}"); + + let mut batch = BatchRequest::new(provider.client()); + + let handles: Vec<_> = (from_block..=to_block) + .map(|bn| { + batch + .add_call::<_, Option<::BlockResponse>>( + "eth_getBlockByNumber", + &(format!("0x{bn:x}"), false), + ) + .expect("infallible") + }) + .collect(); + + batch.send().await?; + + let blocks: Vec<_> = future::join_all(handles).await; + + let mut res = Vec::with_capacity(blocks.len()); + + for block in blocks { + let block = block?.ok_or_else(|| anyhow!("Block not found"))?; + let block_hash = H256(block.header.hash.0); + + let height = block.header.number as u32; + let timestamp = block.header.timestamp; + let parent_hash = H256(block.header.parent_hash.0); + + let header = BlockHeader { + height, + timestamp, + parent_hash, + }; + + database.set_block_header(block_hash, header.clone()); + + res.push((block_hash, header)) + } + + Ok(res) + } + /// Populate database with blocks using rpc provider. async fn load_chain_batch( &mut self, @@ -102,59 +152,52 @@ impl Query { to_block: u32, ) -> Result> { let total_blocks = to_block.saturating_sub(from_block) + 1; - log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}"); - let fetches = (from_block..=to_block).map(|block_number| { - let provider = self.provider.clone(); - let database = Arc::clone(&self.database); - tokio::spawn(async move { - let block = provider - .get_block_by_number(BlockNumberOrTag::Number(block_number as u64), false) - .await?; - let block = block - .ok_or_else(|| anyhow!("Block not found for block number {block_number}"))?; - - let height = u32::try_from(block.header.number) - .map_err(|err| anyhow!("Ethereum block number not fit in u32: {err}"))?; - let timestamp = block.header.timestamp; - let block_hash = H256(block.header.hash.0); - let parent_hash = H256(block.header.parent_hash.0); + log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}"); - let header = BlockHeader { - height, - timestamp, - parent_hash, - }; + let headers_handles: Vec<_> = (from_block..=to_block) + .step_by(MAX_QUERY_BLOCK_RANGE) + .map(|start| { + let end = (start + MAX_QUERY_BLOCK_RANGE as u32 - 1).min(to_block); - database.set_block_header(block_hash, header.clone()); + let provider = self.provider.clone(); + let database = self.database.clone(); - Ok::<(H256, BlockHeader), anyhow::Error>((block_hash, header)) + tokio::spawn(async move { + Self::batch_get_block_headers(provider, database, start as u64, end as u64) + .await + }) }) - }); + .collect(); + + let headers_fut = future::join_all(headers_handles); - // Fetch events in block range. - let mut blocks_events = read_block_request_events_batch( + let events_fut = read_block_request_events_batch( from_block, to_block, &self.provider, self.router_address, - ) - .await?; + ); + + let (headers_batches, maybe_events) = future::join(headers_fut, events_fut).await; + let mut events = maybe_events?; + + let mut res = HashMap::with_capacity(total_blocks as usize); - // Collect results - let mut block_headers = HashMap::new(); - for result in futures::future::join_all(fetches).await { - let (block_hash, header) = result??; - // Set block events, empty vec if no events. - self.database.set_block_events( - block_hash, - blocks_events.remove(&block_hash).unwrap_or_default(), - ); - block_headers.insert(block_hash, header); + for batch in headers_batches { + let batch = batch??; + + for (hash, header) in batch { + self.database + .set_block_events(hash, events.remove(&hash).unwrap_or_default()); + + res.insert(hash, header); + } } - log::trace!("{} blocks loaded", block_headers.len()); - Ok(block_headers) + log::trace!("{} blocks loaded", res.len()); + + Ok(res) } pub async fn get_last_committed_chain(&mut self, block_hash: H256) -> Result> { @@ -186,9 +229,16 @@ impl Query { }; let mut chain = Vec::new(); - let mut committed_blocks = Vec::new(); let mut headers_map = HashMap::new(); + let committed_blocks = read_committed_blocks_batch( + latest_valid_block_height + 1, + current_block.height, + &self.provider, + self.router_address, + ) + .await?; + if is_deep_sync { // Load blocks in batch from provider by numbers. headers_map = self @@ -218,7 +268,6 @@ impl Query { } log::trace!("Include block {hash} in chain for processing"); - committed_blocks.extend(self.get_committed_blocks(hash).await?); chain.push(hash); // Fetch parent hash from headers_map or database