diff --git a/ethexe/ethereum/src/router/events.rs b/ethexe/ethereum/src/router/events.rs
index ce67e48d491..b8c29781673 100644
--- a/ethexe/ethereum/src/router/events.rs
+++ b/ethexe/ethereum/src/router/events.rs
@@ -16,10 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use crate::{decode_log, IRouter};
+use crate::{abi::utils::bytes32_to_h256, decode_log, IRouter};
use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent};
use anyhow::{anyhow, Result};
use ethexe_common::events::{RouterEvent, RouterRequestEvent};
+use gprimitives::H256;
use signatures::*;
pub mod signatures {
@@ -89,3 +90,13 @@ pub fn try_extract_request_event(log: &Log) -> Result
Ok(Some(request_event))
}
+
+pub fn try_extract_committed_block_hash(log: &Log) -> Result > {
+ if log.topic0() != Some(&BLOCK_COMMITTED) {
+ return Ok(None);
+ }
+
+ decode_log::(log)
+ .map(|e| Some(bytes32_to_h256(e.hash)))
+ .map_err(Into::into)
+}
diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs
index 3fa8d2dbbed..fdca71af5df 100644
--- a/ethexe/observer/src/observer.rs
+++ b/ethexe/observer/src/observer.rs
@@ -25,7 +25,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::watch;
/// Max number of blocks to query in alloy.
-pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000;
+pub(crate) const MAX_QUERY_BLOCK_RANGE: usize = 1024;
pub(crate) type ObserverProvider = RootProvider;
@@ -273,7 +273,7 @@ pub(crate) async fn read_block_events_batch(
let to_block = to_block as u64;
while start_block <= to_block {
- let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1);
+ let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);
let filter = Filter::new().from_block(start_block).to_block(end_block);
@@ -385,8 +385,9 @@ pub(crate) async fn read_block_request_events_batch(
let mut start_block = from_block as u64;
let to_block = to_block as u64;
+ // TODO (breathx): FIX WITHIN PR. to iters.
while start_block <= to_block {
- let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1);
+ let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);
let filter = Filter::new().from_block(start_block).to_block(end_block);
@@ -476,6 +477,54 @@ async fn read_request_events_impl(
Ok(res)
}
+pub(crate) async fn read_committed_blocks_batch(
+ from_block: u32,
+ to_block: u32,
+ provider: &ObserverProvider,
+ router_address: AlloyAddress,
+) -> Result> {
+ let mut start_block = from_block as u64;
+ let to_block = to_block as u64;
+
+ let mut res = Vec::new();
+
+ while start_block <= to_block {
+ let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);
+
+ let filter = Filter::new().from_block(start_block).to_block(end_block);
+
+ let iter_res = read_committed_blocks_impl(router_address, provider, filter).await?;
+
+ res.extend(iter_res);
+
+ start_block = end_block + 1;
+ }
+
+ Ok(res)
+}
+
+async fn read_committed_blocks_impl(
+ router_address: AlloyAddress,
+ provider: &ObserverProvider,
+ filter: Filter,
+) -> Result> {
+ let filter = filter
+ .address(router_address)
+ .event_signature(Topic::from(router::events::signatures::BLOCK_COMMITTED));
+
+ let logs = provider.get_logs(&filter).await?;
+
+ let mut res = Vec::with_capacity(logs.len());
+
+ for log in logs {
+ if let Some(hash) = router::events::try_extract_committed_block_hash(&log)? {
+ res.push(hash);
+ }
+ }
+
+ Ok(res)
+}
+
#[cfg(test)]
mod tests {
use std::time::Duration;
diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs
index 0f7221bacd8..60fd4bc1c7b 100644
--- a/ethexe/observer/src/query.rs
+++ b/ethexe/observer/src/query.rs
@@ -1,20 +1,16 @@
-use std::{
- collections::{BTreeSet, HashMap, VecDeque},
- sync::Arc,
-};
-
use crate::{
observer::{
read_block_events, read_block_request_events, read_block_request_events_batch,
- read_code_from_tx_hash, ObserverProvider,
+ read_code_from_tx_hash, read_committed_blocks_batch, ObserverProvider,
+ MAX_QUERY_BLOCK_RANGE,
},
BlobReader,
};
use alloy::{
- eips::BlockNumberOrTag,
+ network::{Ethereum, Network},
primitives::Address as AlloyAddress,
providers::{Provider, ProviderBuilder},
- rpc::types::eth::BlockTransactionsKind,
+ rpc::{client::BatchRequest, types::eth::BlockTransactionsKind},
};
use anyhow::{anyhow, Result};
use ethexe_common::{
@@ -22,7 +18,12 @@ use ethexe_common::{
events::{BlockEvent, BlockRequestEvent, RouterEvent},
};
use ethexe_signer::Address;
+use futures::future;
use gprimitives::{CodeId, H256};
+use std::{
+ collections::{BTreeSet, HashMap, VecDeque},
+ sync::Arc,
+};
/// Height difference to start fast sync.
const DEEP_SYNC: u32 = 10;
@@ -95,6 +96,55 @@ impl Query {
.collect())
}
+ async fn batch_get_block_headers(
+ provider: ObserverProvider,
+ database: Arc,
+ from_block: u64,
+ to_block: u64,
+ ) -> Result> {
+ log::debug!("Querying blocks from {from_block} to {to_block}");
+
+ let mut batch = BatchRequest::new(provider.client());
+
+ let handles: Vec<_> = (from_block..=to_block)
+ .map(|bn| {
+ batch
+ .add_call::<_, Option<::BlockResponse>>(
+ "eth_getBlockByNumber",
+ &(format!("0x{bn:x}"), false),
+ )
+ .expect("infallible")
+ })
+ .collect();
+
+ batch.send().await?;
+
+ let blocks: Vec<_> = future::join_all(handles).await;
+
+ let mut res = Vec::with_capacity(blocks.len());
+
+ for block in blocks {
+ let block = block?.ok_or_else(|| anyhow!("Block not found"))?;
+ let block_hash = H256(block.header.hash.0);
+
+ let height = block.header.number as u32;
+ let timestamp = block.header.timestamp;
+ let parent_hash = H256(block.header.parent_hash.0);
+
+ let header = BlockHeader {
+ height,
+ timestamp,
+ parent_hash,
+ };
+
+ database.set_block_header(block_hash, header.clone());
+
+ res.push((block_hash, header))
+ }
+
+ Ok(res)
+ }
+
/// Populate database with blocks using rpc provider.
async fn load_chain_batch(
&mut self,
@@ -102,59 +152,52 @@ impl Query {
to_block: u32,
) -> Result> {
let total_blocks = to_block.saturating_sub(from_block) + 1;
- log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}");
- let fetches = (from_block..=to_block).map(|block_number| {
- let provider = self.provider.clone();
- let database = Arc::clone(&self.database);
- tokio::spawn(async move {
- let block = provider
- .get_block_by_number(BlockNumberOrTag::Number(block_number as u64), false)
- .await?;
- let block = block
- .ok_or_else(|| anyhow!("Block not found for block number {block_number}"))?;
-
- let height = u32::try_from(block.header.number)
- .map_err(|err| anyhow!("Ethereum block number not fit in u32: {err}"))?;
- let timestamp = block.header.timestamp;
- let block_hash = H256(block.header.hash.0);
- let parent_hash = H256(block.header.parent_hash.0);
+ log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}");
- let header = BlockHeader {
- height,
- timestamp,
- parent_hash,
- };
+ let headers_handles: Vec<_> = (from_block..=to_block)
+ .step_by(MAX_QUERY_BLOCK_RANGE)
+ .map(|start| {
+ let end = (start + MAX_QUERY_BLOCK_RANGE as u32 - 1).min(to_block);
- database.set_block_header(block_hash, header.clone());
+ let provider = self.provider.clone();
+ let database = self.database.clone();
- Ok::<(H256, BlockHeader), anyhow::Error>((block_hash, header))
+ tokio::spawn(async move {
+ Self::batch_get_block_headers(provider, database, start as u64, end as u64)
+ .await
+ })
})
- });
+ .collect();
+
+ let headers_fut = future::join_all(headers_handles);
- // Fetch events in block range.
- let mut blocks_events = read_block_request_events_batch(
+ let events_fut = read_block_request_events_batch(
from_block,
to_block,
&self.provider,
self.router_address,
- )
- .await?;
+ );
+
+ let (headers_batches, maybe_events) = future::join(headers_fut, events_fut).await;
+ let mut events = maybe_events?;
+
+ let mut res = HashMap::with_capacity(total_blocks as usize);
- // Collect results
- let mut block_headers = HashMap::new();
- for result in futures::future::join_all(fetches).await {
- let (block_hash, header) = result??;
- // Set block events, empty vec if no events.
- self.database.set_block_events(
- block_hash,
- blocks_events.remove(&block_hash).unwrap_or_default(),
- );
- block_headers.insert(block_hash, header);
+ for batch in headers_batches {
+ let batch = batch??;
+
+ for (hash, header) in batch {
+ self.database
+ .set_block_events(hash, events.remove(&hash).unwrap_or_default());
+
+ res.insert(hash, header);
+ }
}
- log::trace!("{} blocks loaded", block_headers.len());
- Ok(block_headers)
+ log::trace!("{} blocks loaded", res.len());
+
+ Ok(res)
}
pub async fn get_last_committed_chain(&mut self, block_hash: H256) -> Result> {
@@ -186,9 +229,16 @@ impl Query {
};
let mut chain = Vec::new();
- let mut committed_blocks = Vec::new();
let mut headers_map = HashMap::new();
+ let committed_blocks = read_committed_blocks_batch(
+ latest_valid_block_height + 1,
+ current_block.height,
+ &self.provider,
+ self.router_address,
+ )
+ .await?;
+
if is_deep_sync {
// Load blocks in batch from provider by numbers.
headers_map = self
@@ -218,7 +268,6 @@ impl Query {
}
log::trace!("Include block {hash} in chain for processing");
- committed_blocks.extend(self.get_committed_blocks(hash).await?);
chain.push(hash);
// Fetch parent hash from headers_map or database