From 5be18b77b63692e296ee07e3933bc9642e3e3f80 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 16:35:12 +0400 Subject: [PATCH 01/14] impl eq derives; impl zero fns for program state --- ethexe/runtime/common/src/state.rs | 31 +++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 00d77aed821..cb715232a9d 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -39,7 +39,7 @@ use parity_scale_codec::{Decode, Encode}; pub use gear_core::program::ProgramState as InitStatus; -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] pub struct HashAndLen { pub hash: H256, pub len: NonZeroU32, @@ -55,7 +55,7 @@ impl From for HashAndLen { } } -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] pub enum MaybeHash { Hash(HashAndLen), Empty, @@ -77,7 +77,7 @@ impl MaybeHash { } } -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ActiveProgram { /// Hash of wasm memory pages allocations, see [`Allocations`]. pub allocations_hash: MaybeHash, @@ -89,7 +89,7 @@ pub struct ActiveProgram { pub initialized: bool, } -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub enum Program { Active(ActiveProgram), Exited(ProgramId), @@ -97,7 +97,7 @@ pub enum Program { } /// ethexe program state. -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ProgramState { /// Active, exited or terminated program state. pub state: Program, @@ -111,6 +111,27 @@ pub struct ProgramState { pub executable_balance: Value, } +impl ProgramState { + pub const fn zero() -> Self { + Self { + state: Program::Active(ActiveProgram { + allocations_hash: MaybeHash::Empty, + pages_hash: MaybeHash::Empty, + memory_infix: MemoryInfix::new(0), + initialized: false, + }), + queue_hash: MaybeHash::Empty, + waitlist_hash: MaybeHash::Empty, + balance: 0, + executable_balance: 0, + } + } + + pub fn is_zero(&self) -> bool { + *self == Self::zero() + } +} + #[derive(Clone, Debug, Encode, Decode)] pub struct Dispatch { /// Message id. From 0ac3059ac06d47e702177954eb66691f482d04c8 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 16:37:13 +0400 Subject: [PATCH 02/14] support zero state hashes on storage layer --- ethexe/db/src/database.rs | 17 +++++++++++++---- ethexe/runtime/src/wasm/storage.rs | 8 ++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index f2848a97b65..577df36c58d 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -378,14 +378,23 @@ impl Database { // TODO: consider to change decode panics to Results. impl Storage for Database { fn read_state(&self, hash: H256) -> Option { + if hash.is_zero() { + return Some(ProgramState::zero()); + } + let data = self.cas.read(&hash)?; - Some( - ProgramState::decode(&mut &data[..]) - .expect("Failed to decode data into `ProgramState`"), - ) + + let state = ProgramState::decode(&mut &data[..]) + .expect("Failed to decode data into `ProgramState`"); + + Some(state) } fn write_state(&self, state: ProgramState) -> H256 { + if state.is_zero() { + return H256::zero(); + } + self.cas.write(&state.encode()) } diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index 8180de200f1..0e16b763553 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -53,6 +53,10 @@ impl Storage for RuntimeInterfaceStorage { } fn read_state(&self, hash: H256) -> Option { + if hash.is_zero() { + return Some(ProgramState::zero()); + } + database_ri::read_unwrapping(&hash) } @@ -81,6 +85,10 @@ impl Storage for RuntimeInterfaceStorage { } fn write_state(&self, state: ProgramState) -> H256 { + if state.is_zero() { + return H256::zero(); + } + database_ri::write(state) } From eb25e6f14ec6250e9bae72151347b1d81b3270e5 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 16:48:18 +0400 Subject: [PATCH 03/14] zero state hash on program creation --- ethexe/processor/src/lib.rs | 39 +++++++---------------------------- ethexe/processor/src/tests.rs | 26 ++++++++++++++--------- 2 files changed, 23 insertions(+), 42 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 2f7ebc20134..b377ebf6222 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -26,13 +26,10 @@ use ethexe_common::{ BlockEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::{ - self, ActiveProgram, Dispatch, MaybeHash, ProgramState, Storage, -}; +use ethexe_runtime_common::state::{Dispatch, MaybeHash, Storage}; use gear_core::{ ids::{prelude::CodeIdExt, ActorId, MessageId, ProgramId}, message::{DispatchKind, Payload}, - program::MemoryInfix, }; use gprimitives::{CodeId, H256}; use host::InstanceCreator; @@ -121,8 +118,8 @@ impl Processor { Ok(true) } - // TODO: deal with params on smart contract side. - pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result { + pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result<()> { + // TODO (breathx): impl key_exists(). if self.db.original_code(code_id).is_none() { anyhow::bail!("code existence should be checked on smart contract side"); } @@ -133,26 +130,7 @@ impl Processor { self.db.set_program_code_id(program_id, code_id); - // TODO (breathx): state here is non-zero (?!). - - let active_program = ActiveProgram { - allocations_hash: MaybeHash::Empty, - pages_hash: MaybeHash::Empty, - memory_infix: MemoryInfix::new(0), - initialized: false, - }; - - // TODO: on program creation send message to it. - let program_state = ProgramState { - state: state::Program::Active(active_program), - queue_hash: MaybeHash::Empty, - waitlist_hash: MaybeHash::Empty, - balance: 0, - executable_balance: 10_000_000_000_000, // TODO: remove this minting - }; - - // TODO: not write zero state, but just register it (or support default on get) - Ok(self.db.write_state(program_state)) + Ok(()) } pub fn handle_executable_balance_top_up( @@ -300,10 +278,8 @@ impl Processor { match event { BlockEvent::Router(event) => match event.clone() { RouterEvent::ProgramCreated { actor_id, code_id } => { - // TODO (breathx): set this zero like start of the block data. - let state_hash = self.handle_new_program(actor_id, code_id)?; - - programs.insert(actor_id, state_hash); + self.handle_new_program(actor_id, code_id)?; + programs.insert(actor_id, H256::zero()); } _ => { log::debug!( @@ -326,8 +302,7 @@ impl Processor { payload, value, } => { - // TODO (breathx): replace with state_hash.is_zero(); - let kind = if !initial_program_states.contains_key(address) { + let kind = if state_hash.is_zero() { DispatchKind::Init } else { DispatchKind::Handle diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index ceff98da13b..d8184772037 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -212,12 +212,15 @@ fn host_ping_pong() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let state_hash = processor + processor .handle_new_program(program_id, code_id) .expect("failed to create new program"); let state_hash = processor - .handle_user_message(state_hash, vec![create_message(DispatchKind::Init, "PING")]) + .handle_user_message( + H256::zero(), + vec![create_message(DispatchKind::Init, "PING")], + ) .expect("failed to populate message queue"); let _init = processor.run_on_host(program_id, state_hash).unwrap(); @@ -240,13 +243,13 @@ fn ping_pong() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let state_hash = processor + processor .handle_new_program(program_id, code_id) .expect("failed to create new program"); let state_hash = processor .handle_user_message( - state_hash, + H256::zero(), vec![ create_message_full(MessageId::from(1), DispatchKind::Init, user_id, "PING"), create_message_full(MessageId::from(2), DispatchKind::Handle, user_id, "PING"), @@ -317,12 +320,13 @@ fn async_and_ping() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let ping_state_hash = processor + processor .handle_new_program(ping_id, ping_code_id) .expect("failed to create new program"); + let ping_state_hash = processor .handle_user_message( - ping_state_hash, + H256::zero(), vec![UserMessage { id: get_next_message_id(), kind: DispatchKind::Init, @@ -333,12 +337,13 @@ fn async_and_ping() { ) .expect("failed to populate message queue"); - let async_state_hash = processor + processor .handle_new_program(async_id, upload_code_id) .expect("failed to create new program"); + let async_state_hash = processor .handle_user_message( - async_state_hash, + H256::zero(), vec![UserMessage { id: get_next_message_id(), kind: DispatchKind::Init, @@ -431,11 +436,12 @@ fn many_waits() { for i in 0..amount { let program_id = ProgramId::from(i); - let state_hash = processor + processor .handle_new_program(program_id, code_id) .expect("failed to create new program"); + let state_hash = processor - .handle_user_message(state_hash, vec![create_message(DispatchKind::Init, b"")]) + .handle_user_message(H256::zero(), vec![create_message(DispatchKind::Init, b"")]) .expect("failed to populate message queue"); programs.insert(program_id, state_hash); From b0f723c05f91d1afd08600a892541fc4edc66465 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 16:50:14 +0400 Subject: [PATCH 04/14] topup exactly executable balance on request --- ethexe/processor/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index b377ebf6222..f33c90a5881 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -143,8 +143,7 @@ impl Processor { .read_state(state_hash) .ok_or_else(|| anyhow::anyhow!("program should exist"))?; - // TODO (breathx): mutate exec balance after #4067. - state.balance += value; + state.executable_balance += value; Ok(self.db.write_state(state)) } From 2b6d2c94df4686b9c329f8f32f722275c430b182 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 16:58:05 +0400 Subject: [PATCH 05/14] handle unrecognized mirrors --- ethexe/processor/src/lib.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index f33c90a5881..762f6ed8a8c 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -183,7 +183,6 @@ impl Processor { dispatches.push(dispatch); } - // TODO: on zero hash return default avoiding db. let mut program_state = self .db .read_state(program_hash) @@ -278,6 +277,7 @@ impl Processor { BlockEvent::Router(event) => match event.clone() { RouterEvent::ProgramCreated { actor_id, code_id } => { self.handle_new_program(actor_id, code_id)?; + programs.insert(actor_id, H256::zero()); } _ => { @@ -288,12 +288,15 @@ impl Processor { } }, BlockEvent::Mirror { address, event } => { - // TODO (breathx): handle if not (program from another router / incorrect event order ). - let state_hash = *programs.get(address).expect("should exist"); + let Some(&state_hash) = programs.get(address) else { + log::debug!("Received mirror event from unrecognized program ({address}): {event:?}"); + + continue; + }; - let state_hash = match event.clone() { + let new_state_hash = match event.clone() { MirrorEvent::ExecutableBalanceTopUpRequested { value } => { - self.handle_executable_balance_top_up(state_hash, value)? + self.handle_executable_balance_top_up(state_hash, value) } MirrorEvent::MessageQueueingRequested { id, @@ -314,23 +317,24 @@ impl Processor { kind, source, payload, - // TODO (breathx): mutate exec balance after #4067. value, }], - )? + ) } _ => { log::debug!( "Handling for mirror event {event:?} is not yet implemented; noop" ); + continue; } }; - programs.insert(*address, state_hash); + programs.insert(*address, new_state_hash?); } BlockEvent::WVara(event) => { log::debug!("Handling for wvara event {event:?} is not yet implemented; noop"); + continue; } } From 163f3a0fd2c443dd718771eb3e640d59cc905de8 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 17:18:15 +0400 Subject: [PATCH 06/14] split block event handling into funcs --- ethexe/processor/src/lib.rs | 157 ++++++++++++++++++++---------------- 1 file changed, 89 insertions(+), 68 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 762f6ed8a8c..0b4ebbe24b0 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -23,6 +23,7 @@ use core_processor::common::JournalNote; use ethexe_common::{ mirror::Event as MirrorEvent, router::{Event as RouterEvent, StateTransition}, + wvara::Event as WVaraEvent, BlockEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; @@ -263,89 +264,109 @@ impl Processor { ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:?}"); - let mut outcomes = vec![]; - - let initial_program_states = self + let mut states = self .db .block_start_program_states(block_hash) .unwrap_or_default(); - let mut programs = initial_program_states.clone(); - for event in events { match event { - BlockEvent::Router(event) => match event.clone() { - RouterEvent::ProgramCreated { actor_id, code_id } => { - self.handle_new_program(actor_id, code_id)?; - - programs.insert(actor_id, H256::zero()); - } - _ => { - log::debug!( - "Handling for router event {event:?} is not yet implemented; noop" - ); - continue; - } - }, + BlockEvent::Router(event) => { + self.process_router_event(&mut states, event.clone())?; + } BlockEvent::Mirror { address, event } => { - let Some(&state_hash) = programs.get(address) else { - log::debug!("Received mirror event from unrecognized program ({address}): {event:?}"); - - continue; - }; - - let new_state_hash = match event.clone() { - MirrorEvent::ExecutableBalanceTopUpRequested { value } => { - self.handle_executable_balance_top_up(state_hash, value) - } - MirrorEvent::MessageQueueingRequested { - id, - source, - payload, - value, - } => { - let kind = if state_hash.is_zero() { - DispatchKind::Init - } else { - DispatchKind::Handle - }; - - self.handle_user_message( - state_hash, - vec![UserMessage { - id, - kind, - source, - payload, - value, - }], - ) - } - _ => { - log::debug!( - "Handling for mirror event {event:?} is not yet implemented; noop" - ); - - continue; - } - }; - - programs.insert(*address, new_state_hash?); + self.process_mirror_event(&mut states, *address, event.clone())?; } BlockEvent::WVara(event) => { - log::debug!("Handling for wvara event {event:?} is not yet implemented; noop"); - - continue; + self.process_wvara_event(&mut states, event.clone())?; } } } - let current_outcomes = self.run(block_hash, &mut programs)?; + let outcomes = self.run(block_hash, &mut states)?; - outcomes.extend(current_outcomes); - - self.db.set_block_end_program_states(block_hash, programs); + self.db.set_block_end_program_states(block_hash, states); Ok(outcomes) } + + fn process_router_event( + &mut self, + states: &mut BTreeMap, + event: RouterEvent, + ) -> Result<()> { + match event { + RouterEvent::ProgramCreated { actor_id, code_id } => { + self.handle_new_program(actor_id, code_id)?; + + states.insert(actor_id, H256::zero()); + } + _ => { + log::debug!("Handling for router event {event:?} is not yet implemented; noop"); + } + }; + + Ok(()) + } + + fn process_mirror_event( + &mut self, + states: &mut BTreeMap, + actor_id: ProgramId, + event: MirrorEvent, + ) -> Result<()> { + let Some(&state_hash) = states.get(&actor_id) else { + log::debug!("Received event from unrecognized mirror ({actor_id}): {event:?}"); + + return Ok(()); + }; + + let new_state_hash = match event { + MirrorEvent::ExecutableBalanceTopUpRequested { value } => { + self.handle_executable_balance_top_up(state_hash, value)? + } + MirrorEvent::MessageQueueingRequested { + id, + source, + payload, + value, + } => { + let kind = if state_hash.is_zero() { + DispatchKind::Init + } else { + DispatchKind::Handle + }; + + self.handle_user_message( + state_hash, + vec![UserMessage { + id, + kind, + source, + payload, + value, + }], + )? + } + _ => { + log::debug!("Handler for this event isn't yet implemented: {event:?}"); + + return Ok(()); + } + }; + + states.insert(actor_id, new_state_hash); + + Ok(()) + } + + fn process_wvara_event( + &mut self, + _states: &mut BTreeMap, + event: WVaraEvent, + ) -> Result<()> { + log::debug!("Handler for this event isn't yet implemented: {event:?}"); + + Ok(()) + } } From 82b9a4d147fc3c76714a38169679798911d109c2 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 17:49:29 +0400 Subject: [PATCH 07/14] impl fn handle_message_queueing --- ethexe/processor/src/lib.rs | 84 ++++++++++++++++++++++++++---- ethexe/runtime/common/src/state.rs | 7 +++ 2 files changed, 80 insertions(+), 11 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 0b4ebbe24b0..7c6065293c9 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -27,7 +27,7 @@ use ethexe_common::{ BlockEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::{Dispatch, MaybeHash, Storage}; +use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ ids::{prelude::CodeIdExt, ActorId, MessageId, ProgramId}, message::{DispatchKind, Payload}, @@ -43,6 +43,7 @@ mod run; #[cfg(test)] mod tests; +// TODO (breathx): remove me. pub struct UserMessage { id: MessageId, kind: DispatchKind, @@ -149,6 +150,64 @@ impl Processor { Ok(self.db.write_state(state)) } + pub fn handle_payload(&mut self, payload: Vec) -> Result { + let payload = Payload::try_from(payload) + .map_err(|_| anyhow::anyhow!("payload should be checked on eth side"))?; + + let hash = payload + .inner() + .is_empty() + .then_some(MaybeHash::Empty) + .unwrap_or_else(|| self.db.write_payload(payload).into()); + + Ok(hash) + } + + pub fn handle_message_queueing( + &mut self, + state_hash: H256, + dispatch: Dispatch, + ) -> Result { + self.handle_messages_queueing(state_hash, vec![dispatch]) + } + + pub fn handle_messages_queueing( + &mut self, + state_hash: H256, + dispatches: Vec, + ) -> Result { + if dispatches.is_empty() { + return Ok(state_hash); + } + + let mut state = self + .db + .read_state(state_hash) + .ok_or_else(|| anyhow::anyhow!("program should exist"))?; + + anyhow::ensure!(state.state.is_active(), "program should be active"); + + let queue = if let MaybeHash::Hash(HashAndLen { + hash: queue_hash, .. + }) = state.queue_hash + { + let mut queue = self + .db + .read_queue(queue_hash) + .ok_or_else(|| anyhow::anyhow!("queue should exist if hash present"))?; + + queue.extend(dispatches); + + queue + } else { + VecDeque::from(dispatches) + }; + + state.queue_hash = self.db.write_queue(queue).into(); + + Ok(self.db.write_state(state)) + } + pub fn handle_user_message( &mut self, program_hash: H256, @@ -331,22 +390,25 @@ impl Processor { payload, value, } => { + let payload_hash = self.handle_payload(payload)?; + let kind = if state_hash.is_zero() { DispatchKind::Init } else { DispatchKind::Handle }; - self.handle_user_message( - state_hash, - vec![UserMessage { - id, - kind, - source, - payload, - value, - }], - )? + let dispatch = Dispatch { + id, + kind, + source, + payload_hash, + value, + details: None, + context: None, + }; + + self.handle_message_queueing(state_hash, dispatch)? } _ => { log::debug!("Handler for this event isn't yet implemented: {event:?}"); diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index cb715232a9d..ff09446a521 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -96,9 +96,16 @@ pub enum Program { Terminated(ProgramId), } +impl Program { + pub fn is_active(&self) -> bool { + matches!(self, Self::Active(_)) + } +} + /// ethexe program state. #[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ProgramState { + // TODO (breathx): rename to program /// Active, exited or terminated program state. pub state: Program, /// Hash of incoming message queue, see [`MessageQueue`]. From 5ef13c24e84a01f8b5318a97659fce557516c80d Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 18:02:27 +0400 Subject: [PATCH 08/14] extend events handling --- ethexe/processor/src/lib.rs | 39 +++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 7c6065293c9..22ba5ff3301 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -360,8 +360,17 @@ impl Processor { states.insert(actor_id, H256::zero()); } - _ => { - log::debug!("Handling for router event {event:?} is not yet implemented; noop"); + RouterEvent::CodeValidationRequested { .. } + | RouterEvent::BaseWeightChanged { .. } + | RouterEvent::StorageSlotChanged + | RouterEvent::ValidatorsSetChanged + | RouterEvent::ValuePerWeightChanged { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + return Ok(()); + } + RouterEvent::BlockCommitted { .. } | RouterEvent::CodeGotValidated { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); + return Ok(()); } }; @@ -410,9 +419,16 @@ impl Processor { self.handle_message_queueing(state_hash, dispatch)? } - _ => { - log::debug!("Handler for this event isn't yet implemented: {event:?}"); - + MirrorEvent::ReplyQueueingRequested { .. } + | MirrorEvent::ValueClaimingRequested { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + return Ok(()); + } + MirrorEvent::StateChanged { .. } + | MirrorEvent::ValueClaimed { .. } + | MirrorEvent::Message { .. } + | MirrorEvent::Reply { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); return Ok(()); } }; @@ -427,8 +443,15 @@ impl Processor { _states: &mut BTreeMap, event: WVaraEvent, ) -> Result<()> { - log::debug!("Handler for this event isn't yet implemented: {event:?}"); - - Ok(()) + match event { + WVaraEvent::Transfer { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + Ok(()) + } + WVaraEvent::Approval { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); + Ok(()) + } + } } } From 8422efa17c4bc59e2cd8ef6714a163aa0fbc1945 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 18:37:20 +0400 Subject: [PATCH 09/14] remove legacy; adjust tests --- ethexe/processor/src/lib.rs | 91 +------------------- ethexe/processor/src/tests.rs | 153 +++++++++++++++++----------------- 2 files changed, 79 insertions(+), 165 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 22ba5ff3301..850c8f7a0b6 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -19,7 +19,6 @@ //! Program's execution service for eGPU. use anyhow::Result; -use core_processor::common::JournalNote; use ethexe_common::{ mirror::Event as MirrorEvent, router::{Event as RouterEvent, StateTransition}, @@ -29,7 +28,7 @@ use ethexe_common::{ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ - ids::{prelude::CodeIdExt, ActorId, MessageId, ProgramId}, + ids::{prelude::CodeIdExt, ProgramId}, message::{DispatchKind, Payload}, }; use gprimitives::{CodeId, H256}; @@ -43,15 +42,6 @@ mod run; #[cfg(test)] mod tests; -// TODO (breathx): remove me. -pub struct UserMessage { - id: MessageId, - kind: DispatchKind, - source: ActorId, - payload: Vec, - value: u128, -} - pub struct Processor { db: Database, creator: InstanceCreator, @@ -121,7 +111,6 @@ impl Processor { } pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result<()> { - // TODO (breathx): impl key_exists(). if self.db.original_code(code_id).is_none() { anyhow::bail!("code existence should be checked on smart contract side"); } @@ -208,84 +197,6 @@ impl Processor { Ok(self.db.write_state(state)) } - pub fn handle_user_message( - &mut self, - program_hash: H256, - messages: Vec, - ) -> Result { - if messages.is_empty() { - return Ok(program_hash); - } - - let mut dispatches = Vec::with_capacity(messages.len()); - - for message in messages { - let payload = Payload::try_from(message.payload) - .map_err(|_| anyhow::anyhow!("payload should be checked on eth side"))?; - - let payload_hash = payload - .inner() - .is_empty() - .then_some(MaybeHash::Empty) - .unwrap_or_else(|| self.db.write_payload(payload).into()); - - let dispatch = Dispatch { - id: message.id, - kind: message.kind, - source: message.source, - payload_hash, - value: message.value, - // TODO: handle replies. - details: None, - context: None, - }; - - dispatches.push(dispatch); - } - - let mut program_state = self - .db - .read_state(program_hash) - .ok_or_else(|| anyhow::anyhow!("program should exist"))?; - - let mut queue = if let MaybeHash::Hash(queue_hash_and_len) = program_state.queue_hash { - self.db - .read_queue(queue_hash_and_len.hash) - .ok_or_else(|| anyhow::anyhow!("queue should exist if hash present"))? - } else { - VecDeque::with_capacity(dispatches.len()) - }; - - queue.extend(dispatches); - - let queue_hash = self.db.write_queue(queue); - - program_state.queue_hash = MaybeHash::Hash(queue_hash.into()); - - Ok(self.db.write_state(program_state)) - } - - pub fn run_on_host( - &mut self, - program_id: ProgramId, - program_state: H256, - ) -> Result> { - let original_code_id = self.db.program_code_id(program_id).unwrap(); - - let maybe_instrumented_code = self - .db - .instrumented_code(ethexe_runtime::VERSION, original_code_id); - - let mut executor = self.creator.instantiate()?; - - executor.run( - program_id, - original_code_id, - program_state, - maybe_instrumented_code, - ) - } - // TODO: replace LocalOutcome with Transition struct. pub fn run( &mut self, diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index d8184772037..b4b46e6d93f 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -79,7 +79,6 @@ fn process_observer_event() { let create_program_events = vec![ BlockEvent::Router(RouterEvent::ProgramCreated { actor_id, code_id }), - // TODO (breathx): think of constant. BlockEvent::mirror( actor_id, MirrorEvent::ExecutableBalanceTopUpRequested { @@ -196,36 +195,6 @@ fn handle_new_code_invalid() { .is_none()); } -#[test] -fn host_ping_pong() { - init_logger(); - - let db = MemDb::default(); - let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap(); - - init_new_block(&mut processor, Default::default()); - - let program_id = 42.into(); - - let code_id = processor - .handle_new_code(demo_ping::WASM_BINARY) - .expect("failed to call runtime api") - .expect("code failed verification or instrumentation"); - - processor - .handle_new_program(program_id, code_id) - .expect("failed to create new program"); - - let state_hash = processor - .handle_user_message( - H256::zero(), - vec![create_message(DispatchKind::Init, "PING")], - ) - .expect("failed to populate message queue"); - - let _init = processor.run_on_host(program_id, state_hash).unwrap(); -} - #[test] fn ping_pong() { init_logger(); @@ -248,13 +217,27 @@ fn ping_pong() { .expect("failed to create new program"); let state_hash = processor - .handle_user_message( - H256::zero(), - vec![ - create_message_full(MessageId::from(1), DispatchKind::Init, user_id, "PING"), - create_message_full(MessageId::from(2), DispatchKind::Handle, user_id, "PING"), - ], - ) + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let messages = vec![ + create_message_full( + &mut processor, + MessageId::from(1), + DispatchKind::Init, + user_id, + "PING", + ), + create_message_full( + &mut processor, + MessageId::from(2), + DispatchKind::Handle, + user_id, + "PING", + ), + ]; + let state_hash = processor + .handle_messages_queueing(state_hash, messages) .expect("failed to populate message queue"); let mut programs = BTreeMap::from_iter([(program_id, state_hash)]); @@ -272,22 +255,38 @@ fn ping_pong() { assert_eq!(message.payload_bytes(), b"PONG"); } -fn create_message(kind: DispatchKind, payload: impl AsRef<[u8]>) -> UserMessage { - create_message_full(H256::random().into(), kind, H256::random().into(), payload) +fn create_message( + processor: &mut Processor, + kind: DispatchKind, + payload: impl AsRef<[u8]>, +) -> Dispatch { + create_message_full( + processor, + H256::random().into(), + kind, + H256::random().into(), + payload, + ) } fn create_message_full( + processor: &mut Processor, id: MessageId, kind: DispatchKind, source: ActorId, payload: impl AsRef<[u8]>, -) -> UserMessage { - UserMessage { +) -> Dispatch { + let payload = payload.as_ref().to_vec(); + let payload_hash = processor.handle_payload(payload).unwrap(); + + Dispatch { id, kind, source, - payload: payload.as_ref().to_vec(), + payload_hash, value: 0, + details: None, + context: None, } } @@ -324,48 +323,46 @@ fn async_and_ping() { .handle_new_program(ping_id, ping_code_id) .expect("failed to create new program"); + let state_hash = processor + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let message = create_message_full( + &mut processor, + get_next_message_id(), + DispatchKind::Init, + user_id, + "PING", + ); let ping_state_hash = processor - .handle_user_message( - H256::zero(), - vec![UserMessage { - id: get_next_message_id(), - kind: DispatchKind::Init, - source: user_id, - payload: b"PING".to_vec(), - value: 0, - }], - ) + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); processor .handle_new_program(async_id, upload_code_id) .expect("failed to create new program"); + let message = create_message_full( + &mut processor, + get_next_message_id(), + DispatchKind::Init, + user_id, + ping_id.encode(), + ); let async_state_hash = processor - .handle_user_message( - H256::zero(), - vec![UserMessage { - id: get_next_message_id(), - kind: DispatchKind::Init, - source: user_id, - payload: ping_id.encode(), - value: 0, - }], - ) + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); let wait_for_reply_to = get_next_message_id(); + let message = create_message_full( + &mut processor, + wait_for_reply_to, + DispatchKind::Handle, + user_id, + demo_async::Command::Common.encode(), + ); let async_state_hash = processor - .handle_user_message( - async_state_hash, - vec![UserMessage { - id: wait_for_reply_to, - kind: DispatchKind::Handle, - source: user_id, - payload: demo_async::Command::Common.encode(), - value: 0, - }], - ) + .handle_message_queueing(async_state_hash, message) .expect("failed to populate message queue"); let mut programs = @@ -441,7 +438,12 @@ fn many_waits() { .expect("failed to create new program"); let state_hash = processor - .handle_user_message(H256::zero(), vec![create_message(DispatchKind::Init, b"")]) + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let message = create_message(&mut processor, DispatchKind::Init, b""); + let state_hash = processor + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); programs.insert(program_id, state_hash); @@ -451,8 +453,9 @@ fn many_waits() { assert_eq!(to_users.len(), amount as usize); for (_pid, state_hash) in programs.iter_mut() { + let message = create_message(&mut processor, DispatchKind::Handle, b""); let new_state_hash = processor - .handle_user_message(*state_hash, vec![create_message(DispatchKind::Handle, b"")]) + .handle_message_queueing(*state_hash, message) .expect("failed to populate message queue"); *state_hash = new_state_hash; } From 4edeb6fa2ebe04585a746e2f0965f6ced07ea9d4 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 19:04:57 +0400 Subject: [PATCH 10/14] assert source in tests --- ethexe/cli/src/tests.rs | 26 +++++++++++++++++------- ethexe/contracts/script/Deployment.s.sol | 1 - 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 09bbbee1353..48858f5cd5a 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -37,7 +37,7 @@ use ethexe_signer::Signer; use ethexe_validator::Validator; use futures::StreamExt; use gear_core::ids::prelude::*; -use gprimitives::{ActorId, CodeId, MessageId, H256}; +use gprimitives::{ActorId, CodeId, MessageId, H160, H256}; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{ @@ -148,6 +148,7 @@ struct TestEnv { validator_private_key: ethexe_signer::PrivateKey, validator_public_key: ethexe_signer::PublicKey, router_address: ethexe_signer::Address, + sender_address: ActorId, block_time: Duration, running_service_handle: Option>>, } @@ -226,6 +227,7 @@ impl TestEnv { validator_private_key, validator_public_key, router_address, + sender_address: ActorId::from(H160::from(sender_address.0)), block_time, running_service_handle: None, }; @@ -414,9 +416,12 @@ async fn ping() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; @@ -583,9 +588,12 @@ async fn ping_reorg() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; @@ -829,9 +837,13 @@ async fn ping_deep_sync() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, + .. } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; diff --git a/ethexe/contracts/script/Deployment.s.sol b/ethexe/contracts/script/Deployment.s.sol index 849c34ec5b7..c373dfdb189 100644 --- a/ethexe/contracts/script/Deployment.s.sol +++ b/ethexe/contracts/script/Deployment.s.sol @@ -46,7 +46,6 @@ contract RouterScript is Script { mirror = new Mirror(); mirrorProxy = new MirrorProxy(address(router)); - // TODO (breathx): remove this approve. wrappedVara.approve(address(router), type(uint256).max); vm.stopBroadcast(); From 0c29259508433f84a722ee71ec21f3f78da6bf4d Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 19:05:58 +0400 Subject: [PATCH 11/14] resolve some todos --- ethexe/contracts/src/IMirror.sol | 1 - ethexe/observer/src/observer.rs | 2 -- 2 files changed, 3 deletions(-) diff --git a/ethexe/contracts/src/IMirror.sol b/ethexe/contracts/src/IMirror.sol index 37c63e34e9d..cb384620d6d 100644 --- a/ethexe/contracts/src/IMirror.sol +++ b/ethexe/contracts/src/IMirror.sol @@ -61,7 +61,6 @@ interface IMirror { event Reply(bytes payload, uint128 value, bytes32 replyTo, bytes4 indexed replyCode); // TODO (breathx): should we deposit it? should we notify about successful reply sending? - // TODO (breathx): `value` could be removed from event. /** * @dev Emitted when a user succeed in claiming value request and receives balance. * diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 288470fb676..dd24a2b7cef 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -185,7 +185,6 @@ pub(crate) async fn read_block_events( provider: &ObserverProvider, router_address: AlloyAddress, ) -> Result> { - // TODO (breathx): discuss should we check validity of wvara address for router on some block. let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; @@ -265,7 +264,6 @@ pub(crate) async fn read_block_events_batch( provider: &ObserverProvider, router_address: AlloyAddress, ) -> Result>> { - // TODO (breathx): discuss should we check validity of wvara address for router on some block. let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; From 5c85f6ab2eabb6d94d759ff94cc4c885bae3c56d Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 19:10:29 +0400 Subject: [PATCH 12/14] refine local outcome with one option for code --- ethexe/cli/src/service.rs | 11 +++-------- ethexe/processor/src/lib.rs | 19 ++++++++----------- ethexe/processor/src/tests.rs | 8 +++++++- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index e48824c5b2a..21b7b4edfba 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -361,14 +361,9 @@ impl Service { outcomes .into_iter() .map(|outcome| match outcome { - LocalOutcome::CodeApproved(code_id) => Commitment::Code(CodeCommitment { - id: code_id, - valid: true, - }), - LocalOutcome::CodeRejected(code_id) => Commitment::Code(CodeCommitment { - id: code_id, - valid: false, - }), + LocalOutcome::CodeValidated { id, valid } => { + Commitment::Code(CodeCommitment { id, valid }) + } _ => unreachable!("Only code outcomes are expected here"), }) .collect() diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 850c8f7a0b6..95341913113 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -47,15 +47,14 @@ pub struct Processor { creator: InstanceCreator, } -// TODO (breathx): rename outcomes accordingly to events. /// Local changes that can be committed to the network or local signer. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum LocalOutcome { - /// Produced when code with specific id is recorded and available in database. - CodeApproved(CodeId), - - // TODO: add docs - CodeRejected(CodeId), + /// Produced when code with specific id is recorded and validated. + CodeValidated { + id: CodeId, + valid: bool, + }, Transition(StateTransition), } @@ -219,11 +218,9 @@ impl Processor { ) -> Result> { log::debug!("Processing upload code {code_id:?}"); - if code_id != CodeId::generate(code) || self.handle_new_code(code)?.is_none() { - Ok(vec![LocalOutcome::CodeRejected(code_id)]) - } else { - Ok(vec![LocalOutcome::CodeApproved(code_id)]) - } + let valid = !(code_id != CodeId::generate(code) || self.handle_new_code(code)?.is_none()); + + Ok(vec![LocalOutcome::CodeValidated { id: code_id, valid }]) } pub fn process_block_events( diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index b4b46e6d93f..b4b25d09ee5 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -71,7 +71,13 @@ fn process_observer_event() { .process_upload_code(code_id, &code) .expect("failed to upload code"); log::debug!("\n\nUpload code outcomes: {outcomes:?}\n\n"); - assert_eq!(outcomes, vec![LocalOutcome::CodeApproved(code_id)]); + assert_eq!( + outcomes, + vec![LocalOutcome::CodeValidated { + id: code_id, + valid: true + }] + ); let ch1 = init_new_block_from_parent(&mut processor, ch0); From ceefe530a45eb6d77a98791b73c34c7859ba63dd Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 19:12:12 +0400 Subject: [PATCH 13/14] rename state field of program --- ethexe/processor/src/host/threads.rs | 2 +- ethexe/processor/src/lib.rs | 2 +- ethexe/runtime/common/src/journal.rs | 10 +++++----- ethexe/runtime/common/src/lib.rs | 2 +- ethexe/runtime/common/src/state.rs | 5 ++--- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/ethexe/processor/src/host/threads.rs b/ethexe/processor/src/host/threads.rs index c9bf25ff777..ddfb6e3ec95 100644 --- a/ethexe/processor/src/host/threads.rs +++ b/ethexe/processor/src/host/threads.rs @@ -55,7 +55,7 @@ impl ThreadParams { pub fn pages(&mut self) -> &BTreeMap { self.pages.get_or_insert_with(|| { let ProgramState { - state: Program::Active(ActiveProgram { pages_hash, .. }), + program: Program::Active(ActiveProgram { pages_hash, .. }), .. } = self.db.read_state(self.state_hash).expect(UNKNOWN_STATE) else { diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 95341913113..9d3524d665c 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -173,7 +173,7 @@ impl Processor { .read_state(state_hash) .ok_or_else(|| anyhow::anyhow!("program should exist"))?; - anyhow::ensure!(state.state.is_active(), "program should be active"); + anyhow::ensure!(state.program.is_active(), "program should be active"); let queue = if let MaybeHash::Hash(HashAndLen { hash: queue_hash, .. diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index b3342a9509f..56ef8aca17c 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -78,7 +78,7 @@ impl JournalHandler for Handler<'_, S> { DispatchOutcome::Exit { .. } => todo!(), DispatchOutcome::InitSuccess { program_id } => { log::trace!("Dispatch {message_id} init success for program {program_id}"); - self.update_program(program_id, |mut state, _| match &mut state.state { + self.update_program(program_id, |mut state, _| match &mut state.program { state::Program::Active(program) => { program.initialized = true; Some(state) @@ -96,7 +96,7 @@ impl JournalHandler for Handler<'_, S> { ); self.update_program(program_id, |state, _| { Some(ProgramState { - state: state::Program::Terminated(origin), + program: state::Program::Terminated(origin), ..state }) }); @@ -122,7 +122,7 @@ impl JournalHandler for Handler<'_, S> { fn exit_dispatch(&mut self, id_exited: ProgramId, value_destination: ProgramId) { self.update_program(id_exited, |state, _| { Some(ProgramState { - state: state::Program::Exited(value_destination), + program: state::Program::Exited(value_destination), ..state }) }); @@ -325,7 +325,7 @@ impl JournalHandler for Handler<'_, S> { } self.update_program(program_id, |state, storage| { - let state::Program::Active(mut active_state) = state.state else { + let state::Program::Active(mut active_state) = state.program else { return None; }; @@ -343,7 +343,7 @@ impl JournalHandler for Handler<'_, S> { }; Some(ProgramState { - state: state::Program::Active(changed_active_state), + program: state::Program::Active(changed_active_state), ..state }) }); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 7d38bdd4060..19db601a5af 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -192,7 +192,7 @@ pub fn process_next_message>( outgoing_bytes_limit: 64 * 1024 * 1024, }; - let active_state = match program_state.state { + let active_state = match program_state.program { state::Program::Active(state) => state, state::Program::Exited(program_id) | state::Program::Terminated(program_id) => { log::trace!("Program {program_id} is not active"); diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index ff09446a521..0ec6ae6ca1d 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -105,9 +105,8 @@ impl Program { /// ethexe program state. #[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ProgramState { - // TODO (breathx): rename to program /// Active, exited or terminated program state. - pub state: Program, + pub program: Program, /// Hash of incoming message queue, see [`MessageQueue`]. pub queue_hash: MaybeHash, /// Hash of waiting messages list, see [`Waitlist`]. @@ -121,7 +120,7 @@ pub struct ProgramState { impl ProgramState { pub const fn zero() -> Self { Self { - state: Program::Active(ActiveProgram { + program: Program::Active(ActiveProgram { allocations_hash: MaybeHash::Empty, pages_hash: MaybeHash::Empty, memory_infix: MemoryInfix::new(0), From 65076d5e6d4621ef74571d8ae6024e14f6873fee Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 21 Aug 2024 20:08:48 +0400 Subject: [PATCH 14/14] fix if statement on init message --- ethexe/cli/src/tests.rs | 1 + ethexe/processor/src/lib.rs | 9 +++++++-- ethexe/runtime/common/src/state.rs | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 48858f5cd5a..50f45093a1f 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -329,6 +329,7 @@ impl Drop for TestEnv { } } } + #[tokio::test(flavor = "multi_thread")] #[ntest::timeout(60_000)] async fn ping() { diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 9d3524d665c..141555e7993 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -229,7 +229,7 @@ impl Processor { // TODO (breathx): accept not ref? events: &[BlockEvent], ) -> Result> { - log::debug!("Processing events for {block_hash:?}: {events:?}"); + log::debug!("Processing events for {block_hash:?}: {events:#?}"); let mut states = self .db @@ -309,7 +309,12 @@ impl Processor { } => { let payload_hash = self.handle_payload(payload)?; - let kind = if state_hash.is_zero() { + let state = self + .db + .read_state(state_hash) + .ok_or_else(|| anyhow::anyhow!("program should exist"))?; + + let kind = if state.requires_init_message() { DispatchKind::Init } else { DispatchKind::Handle diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 0ec6ae6ca1d..6a88151b283 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -69,6 +69,10 @@ impl From for MaybeHash { } impl MaybeHash { + pub fn is_empty(&self) -> bool { + matches!(self, MaybeHash::Empty) + } + pub fn with_hash_or_default(&self, f: impl FnOnce(H256) -> T) -> T { match &self { Self::Hash(HashAndLen { hash, .. }) => f(*hash), @@ -136,6 +140,20 @@ impl ProgramState { pub fn is_zero(&self) -> bool { *self == Self::zero() } + + pub fn requires_init_message(&self) -> bool { + if !matches!( + self.program, + Program::Active(ActiveProgram { + initialized: false, + .. + }) + ) { + return false; + } + + self.queue_hash.is_empty() && self.waitlist_hash.is_empty() + } } #[derive(Clone, Debug, Encode, Decode)]