Skip to content

Commit

Permalink
feat(ethexe): Deep sync, faster chain load (#4063)
Browse files Browse the repository at this point in the history
  • Loading branch information
ukint-vs authored Aug 5, 2024
1 parent 99c60a6 commit 4a78c74
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 49 deletions.
3 changes: 3 additions & 0 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ impl Service {
// Set block as valid - means state db has all states for the end of the block
db.set_block_end_state_is_valid(block_hash, true);

let header = db.block_header(block_hash).expect("must be set; qed");
db.set_latest_valid_block_height(header.height);

Ok(transition_outcomes)
}

Expand Down
143 changes: 143 additions & 0 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,146 @@ async fn ping_reorg() {

log::info!("📗 Done");
}

// Mine 150 blocks - send message - mine 150 blocks.
// Deep sync must load chain in batch.
#[tokio::test(flavor = "multi_thread")]
#[ntest::timeout(60_000)]
async fn ping_deep_sync() {
gear_utils::init_default_logger();

let anvil = TestEnv::start_anvil();

let mut env = TestEnv::new(TestEnvConfig::default().rpc_url(anvil.ws_endpoint()))
.await
.unwrap();
let mut listener = env.new_listener().await;

env.start_service().await.unwrap();

let provider = env.observer.provider().clone();

let (_, code_id) = env.upload_code(demo_ping::WASM_BINARY).await.unwrap();

log::info!("📗 Waiting for code loaded");
listener
.apply_until(|event| {
if let Event::CodeLoaded(loaded) = event {
assert_eq!(loaded.code_id, code_id);
assert_eq!(loaded.code.as_slice(), demo_ping::WASM_BINARY);
Ok(Some(()))
} else {
Ok(None)
}
})
.await
.unwrap();

log::info!("📗 Waiting for code approval");
listener
.apply_until_block_event(|event| {
if let BlockEvent::CodeApproved(approved) = event {
assert_eq!(approved.code_id, code_id);
Ok(Some(()))
} else {
Ok(None)
}
})
.await
.unwrap();

let code = env
.db
.original_code(code_id)
.expect("After approval, the code is guaranteed to be in the database");
let _ = env
.db
.instrumented_code(1, code_id)
.expect("After approval, instrumented code is guaranteed to be in the database");
assert_eq!(code, demo_ping::WASM_BINARY);

let _ = env
.ethereum
.router()
.create_program(code_id, H256::random(), b"PING", 10_000_000_000, 0)
.await
.unwrap();

log::info!("📗 Waiting for program create, PONG reply and program update");
let mut reply_sent = false;
let mut program_id = ActorId::default();
let mut block_committed = None;
listener
.apply_until_block_event(|event| {
match event {
BlockEvent::CreateProgram(create) => {
assert_eq!(create.code_id, code_id);
assert_eq!(create.init_payload, b"PING");
assert_eq!(create.gas_limit, 10_000_000_000);
assert_eq!(create.value, 0);
program_id = create.actor_id;
}
BlockEvent::UserReplySent(reply) => {
assert_eq!(reply.value, 0);
assert_eq!(reply.payload, b"PONG");
reply_sent = true;
}
BlockEvent::UpdatedProgram(updated) => {
assert_eq!(updated.actor_id, program_id);
assert_eq!(updated.old_state_hash, H256::zero());
assert!(reply_sent);
}
BlockEvent::BlockCommitted(committed) => {
block_committed = Some(committed.block_hash);
return Ok(Some(()));
}
_ => {}
}
Ok(None)
})
.await
.unwrap();

let block_committed_on_router = env.router_query.last_commitment_block_hash().await.unwrap();
assert_eq!(block_committed, Some(block_committed_on_router));

// Mine some blocks to check deep sync.
provider
.evm_mine(Some(MineOptions::Options {
timestamp: None,
blocks: Some(150),
}))
.await
.unwrap();

// Send mesage in between.
let program_address = ethexe_signer::Address::try_from(program_id).unwrap();
let ping_program = env.ethereum.program(program_address);
let _tx = ping_program
.send_message(b"PING", 10_000_000_000, 0)
.await
.unwrap();

// Mine some blocks to check deep sync.
provider
.evm_mine(Some(MineOptions::Options {
timestamp: None,
blocks: Some(150),
}))
.await
.unwrap();

log::info!("📗 Waiting for PONG reply");
listener
.apply_until_block_event(|event| {
if let BlockEvent::UserReplySent(reply) = event {
assert_eq!(reply.value, 0);
assert_eq!(reply.payload, b"PONG");
Ok(Some(()))
} else {
Ok(None)
}
})
.await
.unwrap();
}
3 changes: 3 additions & 0 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub trait BlockMetaStorage: Send + Sync {

fn block_outcome(&self, block_hash: H256) -> Option<Vec<StateTransition>>;
fn set_block_outcome(&self, block_hash: H256, outcome: Vec<StateTransition>);

fn latest_valid_block_height(&self) -> Option<u32>;
fn set_latest_valid_block_height(&self, block_height: u32);
}

pub trait CodesStorage: Send + Sync {
Expand Down
16 changes: 16 additions & 0 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum KeyPrefix {
BlockOutcome = 5,
BlockSmallMeta = 6,
CodeUpload = 7,
LatestValidBlock = 8,
}

impl KeyPrefix {
Expand Down Expand Up @@ -233,6 +234,21 @@ impl BlockMetaStorage for Database {
self.kv
.put(&KeyPrefix::BlockOutcome.one(block_hash), outcome.encode());
}

fn latest_valid_block_height(&self) -> Option<u32> {
self.kv
.get(&KeyPrefix::LatestValidBlock.one([]))
.map(|block_height| {
u32::from_le_bytes(block_height.try_into().expect("must be correct; qed"))
})
}

fn set_latest_valid_block_height(&self, block_height: u32) {
self.kv.put(
&KeyPrefix::LatestValidBlock.one([]),
block_height.to_le_bytes().to_vec(),
);
}
}

impl CodesStorage for Database {
Expand Down
46 changes: 45 additions & 1 deletion ethexe/observer/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ use ethexe_signer::Address;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use gear_core::ids::prelude::*;
use gprimitives::{ActorId, CodeId, H256};
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::watch;

/// Max number of blocks to query in alloy.
pub(crate) const MAX_QUERY_BLOCK_RANGE: u32 = 100_000;

pub(crate) type ObserverProvider = RootProvider<BoxTransport>;

#[derive(Clone)]
Expand Down Expand Up @@ -208,6 +211,47 @@ pub(crate) async fn read_block_events(
Ok(events)
}

pub(crate) async fn read_block_events_batch(
from_block: u32,
to_block: u32,
provider: &ObserverProvider,
router_address: AlloyAddress,
) -> Result<BTreeMap<H256, Vec<BlockEvent>>> {
let mut events_map: BTreeMap<H256, Vec<BlockEvent>> = BTreeMap::new();
let mut start_block = from_block;

while start_block <= to_block {
let end_block = std::cmp::min(start_block + MAX_QUERY_BLOCK_RANGE - 1, to_block);
let router_events_filter = Filter::new()
.from_block(start_block as u64)
.to_block(end_block as u64)
.address(router_address)
.event_signature(Topic::from_iter(
signature_hash::ROUTER_EVENTS
.iter()
.map(|hash| B256::new(*hash)),
));

let logs = provider.get_logs(&router_events_filter).await?;

for log in logs.iter() {
let block_hash = H256(log.block_hash.ok_or(anyhow!("Block hash is missing"))?.0);

let Some(event) = match_log(log)? else {
continue;
};

events_map
.entry(block_hash)
.and_modify(|events| events.push(event.clone()))
.or_insert(vec![event]);
}
start_block = end_block + 1;
}

Ok(events_map)
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
Loading

0 comments on commit 4a78c74

Please sign in to comment.