From 52c8c410db793c3d9ee8e29a3295030816e06057 Mon Sep 17 00:00:00 2001 From: Dmitrii Novikov Date: Thu, 26 Sep 2024 15:03:08 +0400 Subject: [PATCH] feat(ethexe): impl Mailbox interactions (#4257) --- Cargo.lock | 1 + Makefile | 5 +- ethexe/cli/Cargo.toml | 1 + ethexe/cli/src/tests.rs | 159 ++++++++++++++++++++++++++- ethexe/db/src/database.rs | 12 +- ethexe/processor/src/lib.rs | 149 +++++++++++++++++++++++-- ethexe/runtime/common/src/journal.rs | 22 ++++ ethexe/runtime/common/src/state.rs | 14 ++- ethexe/runtime/src/wasm/storage.rs | 10 +- 9 files changed, 359 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05759e08a9a..a2194f64b55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5112,6 +5112,7 @@ dependencies = [ "ethexe-processor", "ethexe-prometheus-endpoint", "ethexe-rpc", + "ethexe-runtime-common", "ethexe-sequencer", "ethexe-signer", "ethexe-utils", diff --git a/Makefile b/Makefile index d7626ada924..d887b79f8a5 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,9 @@ # Ethexe section .PHONY: ethexe-pre-commit -ethexe-pre-commit: ethexe-contracts-pre-commit +ethexe-pre-commit: ethexe-contracts-pre-commit ethexe-pre-commit-no-contracts + +.PHONY: ethexe-pre-commit-no-contracts +ethexe-pre-commit-no-contracts: @ echo " > Formatting ethexe" && cargo +nightly fmt --all -- --config imports_granularity=Crate,edition=2021 @ echo " >> Clippy checking ethexe" && cargo clippy -p "ethexe-*" --all-targets --all-features -- --no-deps -D warnings diff --git a/ethexe/cli/Cargo.toml b/ethexe/cli/Cargo.toml index a6894ac1bc9..dee68571558 100644 --- a/ethexe/cli/Cargo.toml +++ b/ethexe/cli/Cargo.toml @@ -23,6 +23,7 @@ ethexe-sequencer.workspace = true ethexe-ethereum.workspace = true ethexe-validator.workspace = true ethexe-common.workspace = true +ethexe-runtime-common.workspace = true ethexe-prometheus-endpoint.workspace = true ethexe-rpc.workspace = true ethexe-utils.workspace = true diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 0c64f3723bf..56fe8262256 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -32,6 +32,7 @@ use ethexe_db::{Database, MemDb}; use ethexe_ethereum::{router::RouterQuery, Ethereum}; use ethexe_observer::{Event, MockBlobReader, Observer, Query}; use ethexe_processor::Processor; +use ethexe_runtime_common::state::Storage; use ethexe_sequencer::Sequencer; use ethexe_signer::Signer; use ethexe_validator::Validator; @@ -41,7 +42,7 @@ use gear_core::{ }; use gprimitives::{ActorId, CodeId, MessageId, H160, H256}; use parity_scale_codec::Encode; -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio::{ sync::oneshot, task::{self, JoinHandle}, @@ -136,6 +137,162 @@ async fn ping() { assert_eq!(res.reply_value, 0); } +#[tokio::test(flavor = "multi_thread")] +#[ntest::timeout(60_000)] +async fn mailbox() { + gear_utils::init_default_logger(); + + let mut env = TestEnv::new(Default::default()).await.unwrap(); + + let sequencer_public_key = env.wallets.next(); + let mut node = env.new_node( + NodeConfig::default() + .sequencer(sequencer_public_key) + .validator(env.validators[0]), + ); + node.start_service().await; + + let res = env + .upload_code(demo_async::WASM_BINARY) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + + assert!(res.valid); + + let code_id = res.code_id; + + let res = env + .create_program(code_id, &env.sender_id.encode(), 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + + assert_eq!(res.reply_code, ReplyCode::Success(SuccessReplyReason::Auto)); + + let pid = res.program_id; + + env.approve_wvara(pid).await; + + let res = env + .send_message(pid, &demo_async::Command::Mutex.encode(), 0) + .await + .unwrap(); + + let mid_expected_message = MessageId::generate_outgoing(res.message_id, 0); + let ping_expected_message = MessageId::generate_outgoing(res.message_id, 1); + + let mut listener = env.events_publisher().subscribe().await; + listener + .apply_until_block_event(|event| match event { + BlockEvent::Mirror { address, event } if address == pid => { + if let MirrorEvent::Message { + id, + destination, + payload, + .. + } = event + { + assert_eq!(destination, env.sender_id); + + if id == mid_expected_message { + assert_eq!(payload, res.message_id.encode()); + Ok(None) + } else if id == ping_expected_message { + assert_eq!(payload, b"PING"); + Ok(Some(())) + } else { + unreachable!() + } + } else { + Ok(None) + } + } + _ => Ok(None), + }) + .await + .unwrap(); + + let expected_mailbox = BTreeMap::from_iter([( + env.sender_id, + BTreeMap::from_iter([ + (mid_expected_message, (0, 0)), + (ping_expected_message, (0, 0)), + ]), + )]); + let mirror = env.ethereum.mirror(pid.try_into().unwrap()); + let state_hash = mirror.query().state_hash().await.unwrap(); + + let state = node.db.read_state(state_hash).unwrap(); + assert!(!state.mailbox_hash.is_empty()); + let mailbox = state + .mailbox_hash + .with_hash_or_default(|hash| node.db.read_mailbox(hash).unwrap()); + + assert_eq!(mailbox, expected_mailbox); + + mirror + .send_reply(ping_expected_message, "PONG", 0) + .await + .unwrap(); + + let initial_message = res.message_id; + let reply_info = res.wait_for().await.unwrap(); + assert_eq!( + reply_info.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + assert_eq!(reply_info.reply_payload, initial_message.encode()); + + let state_hash = mirror.query().state_hash().await.unwrap(); + + let state = node.db.read_state(state_hash).unwrap(); + assert!(!state.mailbox_hash.is_empty()); + let mailbox = state + .mailbox_hash + .with_hash_or_default(|hash| node.db.read_mailbox(hash).unwrap()); + + let expected_mailbox = BTreeMap::from_iter([( + env.sender_id, + BTreeMap::from_iter([(mid_expected_message, (0, 0))]), + )]); + + assert_eq!(mailbox, expected_mailbox); + + mirror.claim_value(mid_expected_message).await.unwrap(); + + listener + .apply_until_block_event(|event| match event { + BlockEvent::Mirror { address, event } if address == pid => match event { + MirrorEvent::ValueClaimed { claimed_id, .. } + if claimed_id == mid_expected_message => + { + Ok(Some(())) + } + _ => Ok(None), + }, + _ => Ok(None), + }) + .await + .unwrap(); + + let state_hash = mirror.query().state_hash().await.unwrap(); + + let state = node.db.read_state(state_hash).unwrap(); + assert!(!state.mailbox_hash.is_empty()); // could be empty + let mailbox = state + .mailbox_hash + .with_hash_or_default(|hash| node.db.read_mailbox(hash).unwrap()); + + let expected_mailbox = BTreeMap::from_iter([(env.sender_id, BTreeMap::new())]); + + assert_eq!(mailbox, expected_mailbox); +} + #[tokio::test(flavor = "multi_thread")] #[ntest::timeout(120_000)] async fn ping_reorg() { diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index 7c5ff42e78a..48a463a7e5d 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -30,7 +30,7 @@ use ethexe_common::{ BlockRequestEvent, }; use ethexe_runtime_common::state::{ - Allocations, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, + Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, }; use gear_core::{ code::InstrumentedCode, @@ -469,6 +469,16 @@ impl Storage for Database { self.cas.write(&waitlist.encode()) } + fn read_mailbox(&self, hash: H256) -> Option { + self.cas.read(&hash).map(|data| { + Mailbox::decode(&mut data.as_slice()).expect("Failed to decode data into `Mailbox`") + }) + } + + fn write_mailbox(&self, mailbox: Mailbox) -> H256 { + self.cas.write(&mailbox.encode()) + } + fn read_pages(&self, hash: H256) -> Option { let data = self.cas.read(&hash)?; Some(MemoryPages::decode(&mut &data[..]).expect("Failed to decode data into `MemoryPages`")) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 73672b3cec8..51c307b1f13 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -21,15 +21,18 @@ use anyhow::Result; use ethexe_common::{ mirror::RequestEvent as MirrorEvent, - router::{RequestEvent as RouterEvent, StateTransition}, + router::{RequestEvent as RouterEvent, StateTransition, ValueClaim}, wvara::RequestEvent as WVaraEvent, BlockRequestEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ - ids::{prelude::CodeIdExt, ProgramId}, - message::{DispatchKind, Payload, ReplyInfo}, + ids::{ + prelude::{CodeIdExt, MessageIdExt}, + ProgramId, + }, + message::{DispatchKind, Payload, ReplyDetails, ReplyInfo, SuccessReplyReason}, }; use gprimitives::{ActorId, CodeId, MessageId, H256}; use host::InstanceCreator; @@ -69,6 +72,8 @@ impl OverlaidProcessor { .block_start_program_states(block_hash) .unwrap_or_default(); + let mut value_claims = Default::default(); + let Some(&state_hash) = states.get(&program_id) else { return Err(anyhow::anyhow!("unknown program at specified block hash")); }; @@ -85,6 +90,7 @@ impl OverlaidProcessor { self.0.handle_mirror_event( &mut states, + &mut value_claims, program_id, MirrorEvent::MessageQueueingRequested { id: MessageId::zero(), @@ -272,6 +278,95 @@ impl Processor { Ok(self.db.write_state(state)) } + fn handle_reply_queueing( + &mut self, + state_hash: H256, + mailboxed_id: MessageId, + user_id: ActorId, + payload: Vec, + value: u128, + ) -> Result> { + self.handle_mailboxed_message_impl( + state_hash, + mailboxed_id, + user_id, + payload, + value, + SuccessReplyReason::Manual, + ) + } + + fn handle_value_claiming( + &mut self, + state_hash: H256, + mailboxed_id: MessageId, + user_id: ActorId, + ) -> Result> { + self.handle_mailboxed_message_impl( + state_hash, + mailboxed_id, + user_id, + vec![], + 0, + SuccessReplyReason::Auto, + ) + } + + fn handle_mailboxed_message_impl( + &mut self, + state_hash: H256, + mailboxed_id: MessageId, + user_id: ActorId, + payload: Vec, + value: u128, + reply_reason: SuccessReplyReason, + ) -> Result> { + let mut state = self + .db + .read_state(state_hash) + .ok_or_else(|| anyhow::anyhow!("program should exist"))?; + + let mut mailbox = state.mailbox_hash.with_hash_or_default(|hash| { + self.db.read_mailbox(hash).expect("Failed to read mailbox") + }); + + let entry = mailbox.entry(user_id).or_default(); + + let Some((claimed_value, _expiration)) = entry.remove(&mailboxed_id) else { + return Ok(None); + }; + + state.mailbox_hash = self.db.write_mailbox(mailbox).into(); + + let claim = ValueClaim { + message_id: mailboxed_id, + destination: user_id, + value: claimed_value, + }; + + let mut queue = state + .queue_hash + .with_hash_or_default(|hash| self.db.read_queue(hash).expect("Failed to read queue")); + + let payload_hash = self.handle_payload(payload)?; + + let dispatch = Dispatch { + id: MessageId::generate_reply(mailboxed_id), + kind: DispatchKind::Reply, + source: user_id, + payload_hash, + value, + details: Some(ReplyDetails::new(mailboxed_id, reply_reason.into()).into()), + context: None, + }; + + queue.push_back(dispatch); + + state.queue_hash = self.db.write_queue(queue).into(); + + Ok(Some((claim, self.db.write_state(state)))) + } + // TODO: replace LocalOutcome with Transition struct. pub fn run( &mut self, @@ -312,13 +407,15 @@ impl Processor { .block_start_program_states(block_hash) .unwrap_or_default(); + let mut all_value_claims = Default::default(); + for event in events { match event { BlockRequestEvent::Router(event) => { self.handle_router_event(&mut states, event)?; } BlockRequestEvent::Mirror { address, event } => { - self.handle_mirror_event(&mut states, address, event)?; + self.handle_mirror_event(&mut states, &mut all_value_claims, address, event)?; } BlockRequestEvent::WVara(event) => { self.handle_wvara_event(&mut states, event)?; @@ -326,7 +423,18 @@ impl Processor { } } - let outcomes = self.run(block_hash, &mut states)?; + let mut outcomes = self.run(block_hash, &mut states)?; + + for outcome in &mut outcomes { + if let LocalOutcome::Transition(StateTransition { + actor_id, + value_claims, + .. + }) = outcome + { + value_claims.extend(all_value_claims.remove(actor_id).unwrap_or_default()); + } + } self.db.set_block_end_program_states(block_hash, states); @@ -360,6 +468,7 @@ impl Processor { fn handle_mirror_event( &mut self, states: &mut BTreeMap, + value_claims: &mut BTreeMap>, actor_id: ProgramId, event: MirrorEvent, ) -> Result<()> { @@ -404,10 +513,32 @@ impl Processor { self.handle_message_queueing(state_hash, dispatch)? } - MirrorEvent::ReplyQueueingRequested { .. } - | MirrorEvent::ValueClaimingRequested { .. } => { - log::debug!("Handler not yet implemented: {event:?}"); - return Ok(()); + MirrorEvent::ReplyQueueingRequested { + replied_to, + source, + payload, + value, + } => { + let Some((value_claim, state_hash)) = + self.handle_reply_queueing(state_hash, replied_to, source, payload, value)? + else { + return Ok(()); + }; + + value_claims.entry(actor_id).or_default().push(value_claim); + + state_hash + } + MirrorEvent::ValueClaimingRequested { claimed_id, source } => { + let Some((value_claim, state_hash)) = + self.handle_value_claiming(state_hash, claimed_id, source)? + else { + return Ok(()); + }; + + value_claims.entry(actor_id).or_default().push(value_claim); + + state_hash } }; diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 56ef8aca17c..afb241d3d89 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -162,6 +162,28 @@ impl JournalHandler for Handler<'_, S> { } if !self.program_states.contains_key(&dispatch.destination()) { + if !dispatch.is_reply() { + self.update_program(dispatch.source(), |state, storage| { + let mut mailbox = state.mailbox_hash.with_hash_or_default(|hash| { + storage.read_mailbox(hash).expect("Failed to read mailbox") + }); + + let entry = mailbox.entry(dispatch.destination()).or_default(); + + // TODO (breathx): put some expiration instead of 0. + let r = entry.insert(dispatch.id(), (dispatch.value(), 0)); + debug_assert!(r.is_none()); + let _ = r; + + let mailbox_hash = storage.write_mailbox(mailbox).into(); + + Some(ProgramState { + mailbox_hash, + ..state + }) + }); + } + self.to_users_messages.push(dispatch.into_parts().1); return; } diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 6a88151b283..a315696c25b 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -33,7 +33,7 @@ use gear_core::{ program::MemoryInfix, reservation::GasReservationMap, }; -use gprimitives::{CodeId, MessageId, H256}; +use gprimitives::{ActorId, CodeId, MessageId, H256}; use gsys::BlockNumber; use parity_scale_codec::{Decode, Encode}; @@ -115,6 +115,8 @@ pub struct ProgramState { pub queue_hash: MaybeHash, /// Hash of waiting messages list, see [`Waitlist`]. pub waitlist_hash: MaybeHash, + /// Hash of mailboxed messages, see [`Mailbox`]. + pub mailbox_hash: MaybeHash, /// Reducible balance. pub balance: Value, /// Executable balance. @@ -132,6 +134,7 @@ impl ProgramState { }), queue_hash: MaybeHash::Empty, waitlist_hash: MaybeHash::Empty, + mailbox_hash: MaybeHash::Empty, balance: 0, executable_balance: 0, } @@ -178,6 +181,9 @@ pub type MessageQueue = VecDeque; pub type Waitlist = BTreeMap>; +// TODO (breathx): consider here LocalMailbox for each user. +pub type Mailbox = BTreeMap>; + pub type MemoryPages = BTreeMap; pub type Allocations = IntervalsTree; @@ -201,6 +207,12 @@ pub trait Storage { /// Writes waitlist and returns its hash. fn write_waitlist(&self, waitlist: Waitlist) -> H256; + /// Reads mailbox by mailbox hash. + fn read_mailbox(&self, hash: H256) -> Option; + + /// Writes mailbox and returns its hash. + fn write_mailbox(&self, mailbox: Mailbox) -> H256; + /// Reads memory pages by pages hash. fn read_pages(&self, hash: H256) -> Option; diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index 0e16b763553..c7d8566112e 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -20,7 +20,7 @@ use super::interface::database_ri; use alloc::{collections::BTreeMap, vec::Vec}; use core_processor::configs::BlockInfo; use ethexe_runtime_common::{ - state::{Allocations, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist}, + state::{Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist}, RuntimeInterface, }; use gear_core::{memory::PageBuf, message::Payload, pages::GearPage}; @@ -64,6 +64,10 @@ impl Storage for RuntimeInterfaceStorage { database_ri::read_unwrapping(&hash) } + fn read_mailbox(&self, hash: H256) -> Option { + database_ri::read_unwrapping(&hash) + } + fn write_allocations(&self, allocations: Allocations) -> H256 { database_ri::write(allocations) } @@ -95,6 +99,10 @@ impl Storage for RuntimeInterfaceStorage { fn write_waitlist(&self, waitlist: Waitlist) -> H256 { database_ri::write(waitlist) } + + fn write_mailbox(&self, mailbox: Mailbox) -> H256 { + database_ri::write(mailbox) + } } #[derive(Debug, Clone)]