Skip to content

Commit

Permalink
throw events for handling everywhere only
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Aug 26, 2024
1 parent 412727c commit 3f3eec7
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 85 deletions.
43 changes: 26 additions & 17 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ use crate::{
};
use anyhow::{anyhow, Ok, Result};
use ethexe_common::{
router::{BlockCommitment, CodeCommitment, Event as RouterEvent, StateTransition},
BlockEvent,
router::{
BlockCommitment, CodeCommitment, EventForHandling as RouterEventForHandling,
StateTransition,
},
BlockEventForHandling,
};
use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_observer::{BlockData, Event as ObserverEvent};
use ethexe_observer::{BlockDataForHandling, EventForHandling};
use ethexe_processor::LocalOutcome;
use ethexe_signer::{PublicKey, Signer};
use ethexe_validator::Commitment;
Expand Down Expand Up @@ -206,17 +209,22 @@ impl Service {
processor: &mut ethexe_processor::Processor,
block_hash: H256,
) -> Result<()> {
let events = query.get_block_events(block_hash).await?;
let events = query.get_block_events_for_handling(block_hash).await?;

for event in events {
match event {
BlockEvent::Router(RouterEvent::CodeValidationRequested {
code_id,
blob_tx_hash,
}) => {
BlockEventForHandling::Router(
RouterEventForHandling::CodeValidationRequested {
code_id,
blob_tx_hash,
},
) => {
db.set_code_blob_tx(code_id, blob_tx_hash);
}
BlockEvent::Router(RouterEvent::ProgramCreated { code_id, .. }) => {
BlockEventForHandling::Router(RouterEventForHandling::ProgramCreated {
code_id,
..
}) => {
if db.original_code(code_id).is_some() {
continue;
}
Expand Down Expand Up @@ -252,9 +260,10 @@ impl Service {

Self::process_upload_codes(db, query, processor, block_hash).await?;

let block_events = query.get_block_events(block_hash).await?;
let block_events_for_handling = query.get_block_events_for_handling(block_hash).await?;

let block_outcomes = processor.process_block_events(block_hash, &block_events)?;
let block_outcomes =
processor.process_block_events(block_hash, block_events_for_handling)?;

let transition_outcomes: Vec<_> = block_outcomes
.into_iter()
Expand Down Expand Up @@ -293,9 +302,9 @@ impl Service {
db: &Database,
query: &mut ethexe_observer::Query,
processor: &mut ethexe_processor::Processor,
block_data: BlockData,
block_data: BlockDataForHandling,
) -> Result<Vec<BlockCommitment>> {
db.set_block_events(block_data.block_hash, block_data.events.clone());
db.set_block_events(block_data.block_hash, block_data.events);
db.set_block_header(
block_data.block_hash,
BlockHeader {
Expand Down Expand Up @@ -335,14 +344,14 @@ impl Service {
query: &mut ethexe_observer::Query,
processor: &mut ethexe_processor::Processor,
maybe_sequencer: &mut Option<ethexe_sequencer::Sequencer>,
observer_event: ethexe_observer::Event,
observer_event: EventForHandling,
) -> Result<Vec<Commitment>> {
if let Some(sequencer) = maybe_sequencer {
sequencer.process_observer_event(&observer_event)?;
}

let commitments = match observer_event {
ObserverEvent::Block(block_data) => {
EventForHandling::Block(block_data) => {
log::info!(
"📦 receive a new block {}, hash {}, parent hash {}",
block_data.block_number,
Expand All @@ -355,7 +364,7 @@ impl Service {

commitments.into_iter().map(Commitment::Block).collect()
}
ethexe_observer::Event::CodeLoaded { code_id, code } => {
EventForHandling::CodeLoaded { code_id, code } => {
let outcomes = processor.process_upload_code(code_id, code.as_slice())?;

outcomes
Expand Down Expand Up @@ -395,7 +404,7 @@ impl Service {
));
}

let observer_events = observer.events();
let observer_events = observer.events_for_handling();
futures::pin_mut!(observer_events);

let (mut network_sender, mut gossipsub_stream, mut network_handle) =
Expand Down
2 changes: 1 addition & 1 deletion ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Listener {

let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>();
let _handle = task::spawn(async move {
let observer_events = observer.events();
let observer_events = observer.events_all();
futures::pin_mut!(observer_events);

send_subscription_created.send(()).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! ethexe common db types and traits.
use crate::{router::StateTransition, BlockEvent};
use crate::{router::StateTransition, BlockEventForHandling};
use alloc::{
collections::{BTreeMap, VecDeque},
vec::Vec,
Expand Down Expand Up @@ -65,8 +65,8 @@ pub trait BlockMetaStorage: Send + Sync {
fn block_end_program_states(&self, block_hash: H256) -> Option<BTreeMap<ActorId, H256>>;
fn set_block_end_program_states(&self, block_hash: H256, map: BTreeMap<ActorId, H256>);

fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEvent>>;
fn set_block_events(&self, block_hash: H256, events: Vec<BlockEvent>);
fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEventForHandling>>;
fn set_block_events(&self, block_hash: H256, events: Vec<BlockEventForHandling>);

fn block_outcome(&self, block_hash: H256) -> Option<Vec<StateTransition>>;
fn set_block_outcome(&self, block_hash: H256, outcome: Vec<StateTransition>);
Expand Down
8 changes: 4 additions & 4 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{CASDatabase, KVDatabase};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage, CodesStorage},
router::StateTransition,
BlockEvent,
BlockEventForHandling,
};
use ethexe_runtime_common::state::{
Allocations, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
Expand Down Expand Up @@ -216,16 +216,16 @@ impl BlockMetaStorage for Database {
);
}

fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEvent>> {
fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEventForHandling>> {
self.kv
.get(&KeyPrefix::BlockEvents.two(self.router_address, block_hash))
.map(|data| {
Vec::<BlockEvent>::decode(&mut data.as_slice())
Vec::<BlockEventForHandling>::decode(&mut data.as_slice())
.expect("Failed to decode data into `Vec<BlockEvent>`")
})
}

fn set_block_events(&self, block_hash: H256, events: Vec<BlockEvent>) {
fn set_block_events(&self, block_hash: H256, events: Vec<BlockEventForHandling>) {
self.kv.put(
&KeyPrefix::BlockEvents.two(self.router_address, block_hash),
events.encode(),
Expand Down
17 changes: 16 additions & 1 deletion ethexe/observer/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
use ethexe_common::BlockEvent;
use ethexe_common::{BlockEvent, BlockEventForHandling};
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};

#[derive(Debug, Clone, Encode, Decode)]
pub enum EventForHandling {
Block(BlockDataForHandling),
CodeLoaded { code_id: CodeId, code: Vec<u8> },
}

#[derive(Debug, Clone, Encode, Decode)]
pub enum Event {
Block(BlockData),
CodeLoaded { code_id: CodeId, code: Vec<u8> },
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct BlockDataForHandling {
pub parent_hash: H256,
pub block_hash: H256,
pub block_number: u64,
pub block_timestamp: u64,
pub events: Vec<BlockEventForHandling>,
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct BlockData {
pub parent_hash: H256,
Expand Down
2 changes: 1 addition & 1 deletion ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ mod observer;
mod query;

pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader};
pub use event::{BlockData, Event};
pub use event::{BlockData, BlockDataForHandling, Event, EventForHandling};
pub use observer::{Observer, ObserverStatus};
pub use query::Query;
105 changes: 101 additions & 4 deletions ethexe/observer/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::{BlobReader, BlockData, Event};
use crate::{
event::{BlockData, BlockDataForHandling, Event, EventForHandling},
BlobReader,
};
use alloy::{
primitives::Address as AlloyAddress,
providers::{Provider, ProviderBuilder, RootProvider},
rpc::types::eth::{Filter, Topic},
transports::BoxTransport,
};
use anyhow::{anyhow, Result};
use ethexe_common::{router::Event as RouterEvent, BlockEvent, BlockEventForHandling};
use ethexe_common::{
router::{Event as RouterEvent, EventForHandling as RouterEventForHandling},
BlockEvent, BlockEventForHandling,
};
use ethexe_ethereum::{
mirror,
router::{self, RouterQuery},
Expand Down Expand Up @@ -71,7 +77,7 @@ impl Observer {
&self.provider
}

pub fn events(&mut self) -> impl Stream<Item = Event> + '_ {
pub fn events_all(&mut self) -> impl Stream<Item = Event> + '_ {
async_stream::stream! {
let block_subscription = self
.provider
Expand Down Expand Up @@ -158,6 +164,95 @@ impl Observer {
}
}
}

pub fn events_for_handling(&mut self) -> impl Stream<Item = EventForHandling> + '_ {
async_stream::stream! {
let block_subscription = self
.provider
.subscribe_blocks()
.await
.expect("failed to subscribe to blocks");
let mut block_stream = block_subscription.into_stream();
let mut futures = FuturesUnordered::new();

loop {
tokio::select! {
block = block_stream.next() => {
let Some(block) = block else {
log::info!("Block stream ended");
break;
};

log::trace!("Received block: {:?}", block.header.hash);

let block_hash = (*block.header.hash.expect("failed to get block hash")).into();
let parent_hash = (*block.header.parent_hash).into();
let block_number = block.header.number.expect("failed to get block number");
let block_timestamp = block.header.timestamp;

let events = match read_block_events_for_handling(block_hash, &self.provider, self.router_address).await {
Ok(events) => events,
Err(err) => {
log::error!("failed to read events: {err}");
continue;
}
};

let mut codes_len = 0;

// Create futures to load codes
// TODO (breathx): remove me from here mb
for event in events.iter() {
if let BlockEventForHandling::Router(RouterEventForHandling::CodeValidationRequested { code_id, blob_tx_hash }) = event {
codes_len += 1;

let blob_reader = self.blob_reader.clone();

let code_id = *code_id;
let blob_tx_hash = *blob_tx_hash;

futures.push(async move {
let attempts = Some(3);

read_code_from_tx_hash(
blob_reader,
code_id,
blob_tx_hash,
attempts,
).await
});
}
}

self.update_status(|status| {
status.eth_block_number = block_number;
if codes_len > 0 {
status.last_router_state = block_number;
}
status.pending_upload_code = codes_len as u64;
});

let block_data = BlockDataForHandling {
block_hash,
parent_hash,
block_number,
block_timestamp,
events,
};

yield EventForHandling::Block(block_data);
},
future = futures.next(), if !futures.is_empty() => {
match future {
Some(Ok((code_id, code))) => yield EventForHandling::CodeLoaded { code_id, code },
Some(Err(err)) => log::error!("failed to handle upload code event: {err}"),
None => continue,
}
}
};
}
}
}
}

pub(crate) async fn read_code_from_tx_hash(
Expand All @@ -180,6 +275,7 @@ pub(crate) async fn read_code_from_tx_hash(

// TODO (breathx): only read events that require some activity.
// TODO (breathx): don't store not our events.
#[allow(unused)] // TODO (breathx).
pub(crate) async fn read_block_events(
block_hash: H256,
provider: &ObserverProvider,
Expand All @@ -195,6 +291,7 @@ pub(crate) async fn read_block_events(
.map(|v| v.into_values().next().unwrap_or_default())
}

#[allow(unused)] // TODO (breathx)
pub(crate) async fn read_block_events_batch(
from_block: u32,
to_block: u32,
Expand Down Expand Up @@ -470,7 +567,7 @@ mod tests {
.await
.expect("failed to create observer");

let observer_events = observer.events();
let observer_events = observer.events_all();
futures::pin_mut!(observer_events);

send_subscription_created.send(()).unwrap();
Expand Down
Loading

0 comments on commit 3f3eec7

Please sign in to comment.