Skip to content

Commit

Permalink
fixit
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Dec 12, 2024
1 parent a967e29 commit 2b04687
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 54 deletions.
13 changes: 12 additions & 1 deletion ethexe/ethereum/src/router/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{decode_log, IRouter};
use crate::{abi::utils::bytes32_to_h256, decode_log, IRouter};
use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent};
use anyhow::{anyhow, Result};
use ethexe_common::events::{RouterEvent, RouterRequestEvent};
use gprimitives::H256;
use signatures::*;

pub mod signatures {
Expand Down Expand Up @@ -89,3 +90,13 @@ pub fn try_extract_request_event(log: &Log) -> Result<Option<RouterRequestEvent>

Ok(Some(request_event))
}

pub fn try_extract_committed_block_hash(log: &Log) -> Result<Option<H256>> {
if log.topic0() != Some(&BLOCK_COMMITTED) {
return Ok(None);
}

decode_log::<IRouter::BlockCommitted>(log)
.map(|e| Some(bytes32_to_h256(e.hash)))
.map_err(Into::into)
}
55 changes: 52 additions & 3 deletions ethexe/observer/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::watch;

/// Max number of blocks to query in alloy.
pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000;
pub(crate) const MAX_QUERY_BLOCK_RANGE: usize = 1024;

pub(crate) type ObserverProvider = RootProvider<BoxTransport>;

Expand Down Expand Up @@ -309,7 +309,7 @@ pub(crate) async fn read_block_events_batch(
let to_block = to_block as u64;

while start_block <= to_block {
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1);
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);

let filter = Filter::new().from_block(start_block).to_block(end_block);

Expand Down Expand Up @@ -421,8 +421,9 @@ pub(crate) async fn read_block_request_events_batch(
let mut start_block = from_block as u64;
let to_block = to_block as u64;

// TODO (breathx): FIX WITHIN PR. to iters.
while start_block <= to_block {
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1);
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);

let filter = Filter::new().from_block(start_block).to_block(end_block);

Expand Down Expand Up @@ -512,6 +513,54 @@ async fn read_request_events_impl(
Ok(res)
}

pub(crate) async fn read_committed_blocks_batch(
from_block: u32,
to_block: u32,
provider: &ObserverProvider,
router_address: AlloyAddress,
) -> Result<Vec<H256>> {
let mut start_block = from_block as u64;
let to_block = to_block as u64;

let mut res = Vec::new();

while start_block <= to_block {
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE as u64 - 1);

let filter = Filter::new().from_block(start_block).to_block(end_block);

let iter_res = read_committed_blocks_impl(router_address, provider, filter).await?;

res.extend(iter_res);

start_block = end_block + 1;
}

Ok(res)
}

async fn read_committed_blocks_impl(
router_address: AlloyAddress,
provider: &ObserverProvider,
filter: Filter,
) -> Result<Vec<H256>> {
let filter = filter
.address(router_address)
.event_signature(Topic::from(router::events::signatures::BLOCK_COMMITTED));

let logs = provider.get_logs(&filter).await?;

let mut res = Vec::with_capacity(logs.len());

for log in logs {
if let Some(hash) = router::events::try_extract_committed_block_hash(&log)? {
res.push(hash);
}
}

Ok(res)
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
149 changes: 99 additions & 50 deletions ethexe/observer/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
use std::{
collections::{BTreeSet, HashMap, VecDeque},
sync::Arc,
};

use crate::{
observer::{
read_block_events, read_block_request_events, read_block_request_events_batch,
read_code_from_tx_hash, ObserverProvider,
read_code_from_tx_hash, read_committed_blocks_batch, ObserverProvider,
MAX_QUERY_BLOCK_RANGE,
},
BlobReader,
};
use alloy::{
eips::BlockNumberOrTag,
network::{Ethereum, Network},
primitives::Address as AlloyAddress,
providers::{Provider, ProviderBuilder},
rpc::types::eth::BlockTransactionsKind,
rpc::{client::BatchRequest, types::eth::BlockTransactionsKind},
};
use anyhow::{anyhow, Result};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage},
events::{BlockEvent, BlockRequestEvent, RouterEvent},
};
use ethexe_signer::Address;
use futures::future;
use gprimitives::{CodeId, H256};
use std::{
collections::{BTreeSet, HashMap, VecDeque},
sync::Arc,
};

/// Height difference to start fast sync.
const DEEP_SYNC: u32 = 10;
Expand Down Expand Up @@ -95,66 +96,108 @@ impl Query {
.collect())
}

async fn batch_get_block_headers(
provider: ObserverProvider,
database: Arc<dyn BlockMetaStorage>,
from_block: u64,
to_block: u64,
) -> Result<Vec<(H256, BlockHeader)>> {
log::debug!("Querying blocks from {from_block} to {to_block}");

let mut batch = BatchRequest::new(provider.client());

let handles: Vec<_> = (from_block..=to_block)
.map(|bn| {
batch
.add_call::<_, Option<<Ethereum as Network>::BlockResponse>>(
"eth_getBlockByNumber",
&(format!("0x{bn:x}"), false),
)
.expect("infallible")
})
.collect();

batch.send().await?;

let blocks: Vec<_> = future::join_all(handles).await;

let mut res = Vec::with_capacity(blocks.len());

for block in blocks {
let block = block?.ok_or_else(|| anyhow!("Block not found"))?;
let block_hash = H256(block.header.hash.0);

let height = block.header.number as u32;
let timestamp = block.header.timestamp;
let parent_hash = H256(block.header.parent_hash.0);

let header = BlockHeader {
height,
timestamp,
parent_hash,
};

database.set_block_header(block_hash, header.clone());

res.push((block_hash, header))
}

Ok(res)
}

/// Populate database with blocks using rpc provider.
async fn load_chain_batch(
&mut self,
from_block: u32,
to_block: u32,
) -> Result<HashMap<H256, BlockHeader>> {
let total_blocks = to_block.saturating_sub(from_block) + 1;
log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}");

let fetches = (from_block..=to_block).map(|block_number| {
let provider = self.provider.clone();
let database = Arc::clone(&self.database);
tokio::spawn(async move {
let block = provider
.get_block_by_number(BlockNumberOrTag::Number(block_number as u64), false)
.await?;
let block = block
.ok_or_else(|| anyhow!("Block not found for block number {block_number}"))?;

let height = u32::try_from(block.header.number)
.map_err(|err| anyhow!("Ethereum block number not fit in u32: {err}"))?;
let timestamp = block.header.timestamp;
let block_hash = H256(block.header.hash.0);
let parent_hash = H256(block.header.parent_hash.0);
log::info!("Starting to load {total_blocks} blocks from {from_block} to {to_block}");

let header = BlockHeader {
height,
timestamp,
parent_hash,
};
let headers_handles: Vec<_> = (from_block..=to_block)
.step_by(MAX_QUERY_BLOCK_RANGE)
.map(|start| {
let end = (start + MAX_QUERY_BLOCK_RANGE as u32 - 1).min(to_block);

database.set_block_header(block_hash, header.clone());
let provider = self.provider.clone();
let database = self.database.clone();

Ok::<(H256, BlockHeader), anyhow::Error>((block_hash, header))
tokio::spawn(async move {
Self::batch_get_block_headers(provider, database, start as u64, end as u64)
.await
})
})
});
.collect();

let headers_fut = future::join_all(headers_handles);

// Fetch events in block range.
let mut blocks_events = read_block_request_events_batch(
let events_fut = read_block_request_events_batch(
from_block,
to_block,
&self.provider,
self.router_address,
)
.await?;
);

let (headers_batches, maybe_events) = future::join(headers_fut, events_fut).await;
let mut events = maybe_events?;

let mut res = HashMap::with_capacity(total_blocks as usize);

// Collect results
let mut block_headers = HashMap::new();
for result in futures::future::join_all(fetches).await {
let (block_hash, header) = result??;
// Set block events, empty vec if no events.
self.database.set_block_events(
block_hash,
blocks_events.remove(&block_hash).unwrap_or_default(),
);
block_headers.insert(block_hash, header);
for batch in headers_batches {
let batch = batch??;

for (hash, header) in batch {
self.database
.set_block_events(hash, events.remove(&hash).unwrap_or_default());

res.insert(hash, header);
}
}
log::trace!("{} blocks loaded", block_headers.len());

Ok(block_headers)
log::trace!("{} blocks loaded", res.len());

Ok(res)
}

pub async fn get_last_committed_chain(&mut self, block_hash: H256) -> Result<Vec<H256>> {
Expand Down Expand Up @@ -186,9 +229,16 @@ impl Query {
};

let mut chain = Vec::new();
let mut committed_blocks = Vec::new();
let mut headers_map = HashMap::new();

let committed_blocks = read_committed_blocks_batch(
latest_valid_block_height + 1,
current_block.height,
&self.provider,
self.router_address,
)
.await?;

if is_deep_sync {
// Load blocks in batch from provider by numbers.
headers_map = self
Expand Down Expand Up @@ -218,7 +268,6 @@ impl Query {
}

log::trace!("Include block {hash} in chain for processing");
committed_blocks.extend(self.get_committed_blocks(hash).await?);
chain.push(hash);

// Fetch parent hash from headers_map or database
Expand Down

0 comments on commit 2b04687

Please sign in to comment.