From e250089b25d68f0261b17b364b3054dbde5d3934 Mon Sep 17 00:00:00 2001 From: Dmitrii Novikov Date: Tue, 1 Oct 2024 17:41:33 +0400 Subject: [PATCH] feat(ethexe): impl InBlockTransition (temp changes within the block) (#4272) --- Cargo.lock | 3 +- _typos.toml | 1 + common/src/auxiliary/task_pool.rs | 4 +- .../src/{scheduler/scope.rs => scheduler.rs} | 100 +++++++++++- common/src/scheduler/mod.rs | 120 -------------- core/src/lib.rs | 4 +- .../scheduler/task.rs => core/src/tasks.rs | 23 +-- ethexe/common/Cargo.toml | 1 - ethexe/common/src/db.rs | 2 +- ethexe/common/src/router.rs | 15 +- ethexe/processor/Cargo.toml | 1 - ethexe/processor/src/common.rs | 18 +++ ethexe/processor/src/handling/events.rs | 71 +++++---- ethexe/processor/src/handling/run.rs | 118 ++------------ ethexe/processor/src/handling/tasks.rs | 40 +++-- ethexe/processor/src/lib.rs | 104 ++++++------- ethexe/processor/src/tests.rs | 57 +++---- ethexe/runtime/common/Cargo.toml | 2 + ethexe/runtime/common/src/journal.rs | 58 +++++-- ethexe/runtime/common/src/lib.rs | 15 +- ethexe/runtime/common/src/transitions.rs | 146 ++++++++++++++++++ gsdk/src/metadata/generated.rs | 81 +++++----- gtest/src/manager.rs | 3 +- gtest/src/manager/journal.rs | 3 +- gtest/src/manager/reservations.rs | 8 +- gtest/src/manager/task.rs | 6 +- gtest/src/state/task_pool.rs | 6 +- pallets/gear-program/src/lib.rs | 1 + pallets/gear-scheduler/src/lib.rs | 1 + pallets/gear-scheduler/src/tests.rs | 2 +- pallets/gear/src/benchmarking/mod.rs | 2 +- pallets/gear/src/internal.rs | 1 + pallets/gear/src/lib.rs | 1 + pallets/gear/src/manager/journal.rs | 3 +- pallets/gear/src/manager/mod.rs | 3 +- pallets/gear/src/manager/task.rs | 1 + pallets/gear/src/tests.rs | 1 + 37 files changed, 557 insertions(+), 469 deletions(-) rename common/src/{scheduler/scope.rs => scheduler.rs} (60%) delete mode 100644 common/src/scheduler/mod.rs rename common/src/scheduler/task.rs => core/src/tasks.rs (91%) create mode 100644 ethexe/runtime/common/src/transitions.rs diff --git a/Cargo.lock b/Cargo.lock index ae706dc3419..ca1df6deefd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5140,7 +5140,6 @@ version = "1.6.1" dependencies = [ "anyhow", "derive_more 0.99.18", - "gear-common", "gear-core", "gprimitives", "hex", @@ -5244,7 +5243,6 @@ dependencies = [ "ethexe-db", "ethexe-runtime", "ethexe-runtime-common", - "gear-common", "gear-core", "gear-core-processor", "gear-lazy-pages", @@ -5313,6 +5311,7 @@ name = "ethexe-runtime-common" version = "1.6.1" dependencies = [ "anyhow", + "ethexe-common", "gear-core", "gear-core-errors", "gear-core-processor", diff --git a/_typos.toml b/_typos.toml index 29ff75070e3..d595665da3d 100644 --- a/_typos.toml +++ b/_typos.toml @@ -15,3 +15,4 @@ extend-exclude = [ overlayed = "overlayed" # typo in sp-state-machine, won't fix. ws = "ws" dur = "dur" +clonable = "clonable" diff --git a/common/src/auxiliary/task_pool.rs b/common/src/auxiliary/task_pool.rs index 622109c455b..372d3956f08 100644 --- a/common/src/auxiliary/task_pool.rs +++ b/common/src/auxiliary/task_pool.rs @@ -19,8 +19,8 @@ //! Auxiliary implementation of the task pool. use super::{AuxiliaryDoubleStorageWrap, BlockNumber, DoubleBTreeMap}; -use crate::scheduler::{ScheduledTask, TaskPoolImpl}; -use gear_core::ids::ProgramId; +use crate::scheduler::TaskPoolImpl; +use gear_core::{ids::ProgramId, tasks::ScheduledTask}; use std::cell::RefCell; /// Task pool implementation that can be used in a native, non-wasm runtimes. diff --git a/common/src/scheduler/scope.rs b/common/src/scheduler.rs similarity index 60% rename from common/src/scheduler/scope.rs rename to common/src/scheduler.rs index fc605c13a85..442db2ede20 100644 --- a/common/src/scheduler/scope.rs +++ b/common/src/scheduler.rs @@ -16,8 +16,104 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::storage::{CountedByKey, DoubleMapStorage, EmptyCallback, KeyIterableByKeyMap}; -use core::marker::PhantomData; +//! Module for scheduler implementation. +//! +//! Scheduler provides API for all available regular or time-dependent actions. + +use crate::storage::{ + CountedByKey, DoubleMapStorage, EmptyCallback, KeyIterableByKeyMap, ValueStorage, +}; +use core::{fmt::Debug, marker::PhantomData}; + +/// Represents scheduler's logic of centralized delayed tasks management logic. +pub trait Scheduler { + /// Block number type of the messenger. + type BlockNumber; + /// Task type. + type Task; + /// Cost type. + type Cost; + /// Inner error type generated by gear's storage types. + type Error: TaskPoolError; + /// Output error of each storage algorithm. + /// + /// Implements `From` to be able to return + /// any required error type. + type OutputError: From + Debug; + + /// Storing costs per block. + type CostsPerBlock: SchedulingCostsPerBlock; + + /// The first block of incomplete tasks, which have already passed, + /// but still contain tasks to deal with. + /// + /// Used for checking if scheduler is able to process + /// current block aimed tasks, or there are some + /// incomplete job from previous blocks. + type FirstIncompleteTasksBlock: ValueStorage; + + /// Gear task pool. + /// + /// Task pool contains tasks with block number when they should be done. + type TaskPool: TaskPool< + BlockNumber = Self::BlockNumber, + Task = Self::Task, + Error = Self::Error, + OutputError = Self::OutputError, + > + CountedByKey + + KeyIterableByKeyMap; + + /// Resets all related to messenger storages. + /// + /// It's a temporary production solution to avoid DB migrations + /// and would be available for test purposes only in the future. + fn reset() { + Self::FirstIncompleteTasksBlock::kill(); + Self::TaskPool::clear(); + } +} + +/// Storing costs getter trait. +pub trait SchedulingCostsPerBlock { + /// Block number type. + type BlockNumber; + /// Cost type. + type Cost; + + /// Extra reserve for being able to pay for blocks with incomplete tasks. + fn reserve_for() -> Self::BlockNumber; + + /// Cost for storing code per block. + fn code() -> Self::Cost; + /// Cost for storing message in mailbox per block. + fn mailbox() -> Self::Cost; + /// Cost for storing program per block. + fn program() -> Self::Cost; + /// Cost for storing message in waitlist per block. + fn waitlist() -> Self::Cost; + /// Cost for reservation holding. + fn reservation() -> Self::Cost; + /// Cost for storing message in dispatch stash. + /// Everything sent delayed goes into dispatch stash. + fn dispatch_stash() -> Self::Cost; + + /// Derives the cost per block based on the lock identifier + fn by_storage_type(storage: StorageType) -> Self::Cost; +} + +/// The type whose variants correspond to various storages used in Gear, +/// including waitlist, mailbox, delayed messages stash etc. +/// Used as a parameter in functions performing some common actions on storages +/// like, for instance, holding cost calculation, to signal a concrete storage kind. +#[derive(Debug, Clone, Copy)] +pub enum StorageType { + Code, + Mailbox, + Program, + Waitlist, + Reservation, + DispatchStash, +} /// Represents tasks managing logic. pub trait TaskPool { diff --git a/common/src/scheduler/mod.rs b/common/src/scheduler/mod.rs deleted file mode 100644 index 39e26a9167d..00000000000 --- a/common/src/scheduler/mod.rs +++ /dev/null @@ -1,120 +0,0 @@ -// This file is part of Gear. - -// Copyright (C) 2022-2024 Gear Technologies Inc. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Module for scheduler implementation. -//! -//! Scheduler provides API for all available regular or time-dependent actions. - -mod scope; -mod task; - -pub use scope::*; -pub use task::*; - -use crate::storage::{CountedByKey, KeyIterableByKeyMap, ValueStorage}; -use core::fmt::Debug; - -/// Represents scheduler's logic of centralized delayed tasks management logic. -pub trait Scheduler { - /// Block number type of the messenger. - type BlockNumber; - /// Task type. - type Task; - /// Cost type. - type Cost; - /// Inner error type generated by gear's storage types. - type Error: TaskPoolError; - /// Output error of each storage algorithm. - /// - /// Implements `From` to be able to return - /// any required error type. - type OutputError: From + Debug; - - /// Storing costs per block. - type CostsPerBlock: SchedulingCostsPerBlock; - - /// The first block of incomplete tasks, which have already passed, - /// but still contain tasks to deal with. - /// - /// Used for checking if scheduler is able to process - /// current block aimed tasks, or there are some - /// incomplete job from previous blocks. - type FirstIncompleteTasksBlock: ValueStorage; - - /// Gear task pool. - /// - /// Task pool contains tasks with block number when they should be done. - type TaskPool: TaskPool< - BlockNumber = Self::BlockNumber, - Task = Self::Task, - Error = Self::Error, - OutputError = Self::OutputError, - > + CountedByKey - + KeyIterableByKeyMap; - - /// Resets all related to messenger storages. - /// - /// It's a temporary production solution to avoid DB migrations - /// and would be available for test purposes only in the future. - fn reset() { - Self::FirstIncompleteTasksBlock::kill(); - Self::TaskPool::clear(); - } -} - -/// Storing costs getter trait. -pub trait SchedulingCostsPerBlock { - /// Block number type. - type BlockNumber; - /// Cost type. - type Cost; - - /// Extra reserve for being able to pay for blocks with incomplete tasks. - fn reserve_for() -> Self::BlockNumber; - - /// Cost for storing code per block. - fn code() -> Self::Cost; - /// Cost for storing message in mailbox per block. - fn mailbox() -> Self::Cost; - /// Cost for storing program per block. - fn program() -> Self::Cost; - /// Cost for storing message in waitlist per block. - fn waitlist() -> Self::Cost; - /// Cost for reservation holding. - fn reservation() -> Self::Cost; - /// Cost for storing message in dispatch stash. - /// Everything sent delayed goes into dispatch stash. - fn dispatch_stash() -> Self::Cost; - - /// Derives the cost per block based on the lock identifier - fn by_storage_type(storage: StorageType) -> Self::Cost; -} - -/// The type whose variants correspond to various storages used in Gear, -/// including waitlist, mailbox, delayed messages stash etc. -/// Used as a parameter in functions performing some common actions on storages -/// like, for instance, holding cost calculation, to signal a concrete storage kind. -#[derive(Debug, Clone, Copy)] -pub enum StorageType { - Code, - Mailbox, - Program, - Waitlist, - Reservation, - DispatchStash, -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 9f3aec6497d..9da9dab1bad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,6 +27,7 @@ extern crate alloc; +pub mod buffer; pub mod code; pub mod costs; pub mod env; @@ -40,9 +41,8 @@ pub mod pages; pub mod percent; pub mod program; pub mod reservation; - -pub mod buffer; pub mod str; +pub mod tasks; // This allows all casts from u32 into usize be safe. const _: () = assert!(size_of::() <= size_of::()); diff --git a/common/src/scheduler/task.rs b/core/src/tasks.rs similarity index 91% rename from common/src/scheduler/task.rs rename to core/src/tasks.rs index 199d3b0a2eb..a1a89732360 100644 --- a/common/src/scheduler/task.rs +++ b/core/src/tasks.rs @@ -1,6 +1,6 @@ // This file is part of Gear. -// Copyright (C) 2022-2024 Gear Technologies Inc. +// Copyright (C) 2021-2024 Gear Technologies Inc. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify @@ -16,12 +16,12 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::Gas; -use gear_core::ids::{CodeId, MessageId, ProgramId, ReservationId}; -use sp_runtime::{ - codec::{self, Decode, Encode, MaxEncodedLen}, - scale_info::{self, TypeInfo}, -}; +//! The module provides primitives for all available regular or time-dependent tasks. + +use crate::ids::{CodeId, MessageId, ProgramId, ReservationId}; +use gsys::Gas; +use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; +use scale_info::TypeInfo; /// Scheduled task sense and required data for processing action. /// @@ -29,8 +29,6 @@ use sp_runtime::{ /// To avoid redundant migrations only append new variant(s) to the enum /// with an explicit corresponding scale codec index. #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Encode, Decode, TypeInfo, MaxEncodedLen)] -#[codec(crate = codec)] -#[scale_info(crate = scale_info)] pub enum ScheduledTask { // Rent charging section. // ----- @@ -71,7 +69,9 @@ pub enum ScheduledTask { /// The message itself stored in DispatchStash. #[codec(index = 7)] SendUserMessage { + /// What message to send. message_id: MessageId, + /// Should it be inserted into users mailbox. to_mailbox: bool, }, @@ -86,6 +86,7 @@ pub enum ScheduledTask { } impl ScheduledTask { + /// Processing function of current task with given handler. pub fn process_with(self, handler: &mut impl TaskHandler) -> Gas { use ScheduledTask::*; @@ -134,10 +135,10 @@ pub trait TaskHandler { /// Wake message action. fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas; - // Send delayed message to program action. + /// Send delayed message to program action. fn send_dispatch(&mut self, stashed_message_id: MessageId) -> Gas; - // Send delayed message to user action. + /// Send delayed message to user action. fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: bool) -> Gas; /// Remove gas reservation action. diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index 44ac27ccade..9e0f167594c 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -12,7 +12,6 @@ repository.workspace = true [dependencies] gear-core.workspace = true gprimitives = { workspace = true, features = ["serde"] } -common.workspace = true parity-scale-codec.workspace = true derive_more.workspace = true diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 7ec182d6dd9..60b276f0eca 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -30,7 +30,7 @@ use gear_core::{ use gprimitives::H256; use parity_scale_codec::{Decode, Encode}; -pub type ScheduledTask = common::scheduler::ScheduledTask; +pub type ScheduledTask = gear_core::tasks::ScheduledTask; #[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)] pub struct BlockHeader { diff --git a/ethexe/common/src/router.rs b/ethexe/common/src/router.rs index 640efaa1d74..2074484976f 100644 --- a/ethexe/common/src/router.rs +++ b/ethexe/common/src/router.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use alloc::vec::Vec; -use gear_core::message::ReplyDetails; +use gear_core::message::{Message, ReplyDetails}; use gprimitives::{ActorId, CodeId, MessageId, H256}; use parity_scale_codec::{Decode, Encode}; @@ -72,6 +72,19 @@ pub struct OutgoingMessage { pub reply_details: Option, } +impl From for OutgoingMessage { + fn from(value: Message) -> Self { + let (id, _source, destination, payload, _gas_limit, value, details) = value.into_parts(); + Self { + id, + destination, + payload: payload.into_vec(), + value, + reply_details: details.and_then(|v| v.to_reply_details()), + } + } +} + /* Events section */ #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] diff --git a/ethexe/processor/Cargo.toml b/ethexe/processor/Cargo.toml index fedfd34a3e4..6833e86a4df 100644 --- a/ethexe/processor/Cargo.toml +++ b/ethexe/processor/Cargo.toml @@ -18,7 +18,6 @@ gprimitives.workspace = true gear-runtime-interface = { workspace = true, features = ["std"] } gear-lazy-pages.workspace = true core-processor.workspace = true -common = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } wasmtime.workspace = true diff --git a/ethexe/processor/src/common.rs b/ethexe/processor/src/common.rs index c588e9e128e..1d1463edb91 100644 --- a/ethexe/processor/src/common.rs +++ b/ethexe/processor/src/common.rs @@ -1,3 +1,21 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + use ethexe_common::router::StateTransition; use gprimitives::CodeId; use parity_scale_codec::{Decode, Encode}; diff --git a/ethexe/processor/src/handling/events.rs b/ethexe/processor/src/handling/events.rs index 4754f738fb9..ceffd2afbf0 100644 --- a/ethexe/processor/src/handling/events.rs +++ b/ethexe/processor/src/handling/events.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::Processor; use anyhow::Result; use ethexe_common::{ mirror::RequestEvent as MirrorEvent, @@ -23,27 +24,26 @@ use ethexe_common::{ wvara::RequestEvent as WVaraEvent, }; use ethexe_db::CodesStorage; -use ethexe_runtime_common::state::{ComplexStorage as _, Dispatch, Storage}; +use ethexe_runtime_common::{ + state::{ComplexStorage as _, Dispatch, Storage}, + InBlockTransitions, +}; use gear_core::{ ids::ProgramId, message::{DispatchKind, SuccessReplyReason}, }; use gprimitives::{ActorId, CodeId, MessageId, H256}; -use std::collections::BTreeMap; - -use crate::Processor; impl Processor { pub(crate) fn handle_router_event( &mut self, - states: &mut BTreeMap, + in_block_transitions: &mut InBlockTransitions, event: RouterEvent, ) -> Result<()> { match event { RouterEvent::ProgramCreated { actor_id, code_id } => { self.handle_new_program(actor_id, code_id)?; - - states.insert(actor_id, H256::zero()); + in_block_transitions.register_new(actor_id); } RouterEvent::CodeValidationRequested { .. } | RouterEvent::BaseWeightChanged { .. } @@ -60,20 +60,24 @@ impl Processor { pub(crate) fn handle_mirror_event( &mut self, - states: &mut BTreeMap, - value_claims: &mut BTreeMap>, + in_block_transitions: &mut InBlockTransitions, actor_id: ProgramId, event: MirrorEvent, ) -> Result<()> { - let Some(&state_hash) = states.get(&actor_id) else { + let Some(state_hash) = in_block_transitions.state_of(&actor_id) else { log::debug!("Received event from unrecognized mirror ({actor_id}): {event:?}"); return Ok(()); }; - let new_state_hash = match event { + match event { MirrorEvent::ExecutableBalanceTopUpRequested { value } => { - self.handle_executable_balance_top_up(state_hash, value)? + let new_state_hash = self.handle_executable_balance_top_up(state_hash, value)?; + in_block_transitions + .modify_state(actor_id, new_state_hash) + .ok_or_else(|| { + anyhow::anyhow!("failed to modify state of recognized program") + })?; } MirrorEvent::MessageQueueingRequested { id, @@ -104,7 +108,12 @@ impl Processor { context: None, }; - self.handle_message_queueing(state_hash, dispatch)? + let new_state_hash = self.handle_message_queueing(state_hash, dispatch)?; + in_block_transitions + .modify_state(actor_id, new_state_hash) + .ok_or_else(|| { + anyhow::anyhow!("failed to modify state of recognized program") + })?; } MirrorEvent::ReplyQueueingRequested { replied_to, @@ -112,37 +121,35 @@ impl Processor { payload, value, } => { - let Some((value_claim, state_hash)) = + if let Some((value_claim, new_state_hash)) = self.handle_reply_queueing(state_hash, replied_to, source, payload, value)? - else { - return Ok(()); - }; - - value_claims.entry(actor_id).or_default().push(value_claim); - - state_hash + { + in_block_transitions + .modify_state_with(actor_id, new_state_hash, 0, vec![value_claim], vec![]) + .ok_or_else(|| { + anyhow::anyhow!("failed to modify state of recognized program") + })?; + } } MirrorEvent::ValueClaimingRequested { claimed_id, source } => { - let Some((value_claim, state_hash)) = + if let Some((value_claim, new_state_hash)) = self.handle_value_claiming(state_hash, claimed_id, source)? - else { - return Ok(()); - }; - - value_claims.entry(actor_id).or_default().push(value_claim); - - state_hash + { + in_block_transitions + .modify_state_with(actor_id, new_state_hash, 0, vec![value_claim], vec![]) + .ok_or_else(|| { + anyhow::anyhow!("failed to modify state of recognized program") + })?; + } } }; - states.insert(actor_id, new_state_hash); - Ok(()) } pub(crate) fn handle_wvara_event( &mut self, - _states: &mut BTreeMap, + _in_block_transitions: &mut InBlockTransitions, event: WVaraEvent, ) -> Result<()> { match event { diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index 27b93e489cb..8323fd7b972 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -16,16 +16,12 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ - common::LocalOutcome, - host::{InstanceCreator, InstanceWrapper}, -}; +use crate::host::{InstanceCreator, InstanceWrapper}; use core_processor::common::JournalNote; -use ethexe_common::router::{OutgoingMessage, StateTransition}; use ethexe_db::{CodesStorage, Database}; -use ethexe_runtime_common::Handler; -use gear_core::{ids::ProgramId, message::Message}; -use gprimitives::{ActorId, H256}; +use ethexe_runtime_common::{Handler, InBlockTransitions}; +use gear_core::ids::ProgramId; +use gprimitives::H256; use std::collections::BTreeMap; use tokio::sync::{mpsc, oneshot}; @@ -47,8 +43,8 @@ pub fn run( threads_amount: usize, db: Database, instance_creator: InstanceCreator, - programs: &mut BTreeMap, -) -> (Vec, Vec) { + in_block_transitions: &mut InBlockTransitions, +) { tokio::task::block_in_place(|| { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(threads_amount) @@ -56,7 +52,7 @@ pub fn run( .build() .unwrap(); - rt.block_on(async { run_in_async(db, instance_creator, programs).await }) + rt.block_on(async { run_in_async(db, instance_creator, in_block_transitions).await }) }) } @@ -65,11 +61,8 @@ pub fn run( async fn run_in_async( db: Database, instance_creator: InstanceCreator, - programs: &mut BTreeMap, -) -> (Vec, Vec) { - let mut to_users_messages = vec![]; - let mut results = BTreeMap::new(); - + in_block_transitions: &mut InBlockTransitions, +) { let num_workers = 4; let mut task_senders = vec![]; @@ -95,8 +88,8 @@ async fn run_in_async( // Send tasks to process programs in workers, until all queues are empty. let mut no_more_to_do = true; - for index in (0..programs.len()).step_by(num_workers) { - let result_receivers = one_batch(index, &task_senders, programs).await; + for index in (0..in_block_transitions.states_amount()).step_by(num_workers) { + let result_receivers = one_batch(index, &task_senders, in_block_transitions).await; let mut super_journal = vec![]; for (program_id, receiver) in result_receivers.into_iter() { @@ -110,24 +103,11 @@ async fn run_in_async( for (program_id, journal) in super_journal { let mut handler = Handler { program_id, - program_states: programs, + in_block_transitions, storage: &db, block_info: Default::default(), - results: Default::default(), - to_users_messages: Default::default(), }; core_processor::handle_journal(journal, &mut handler); - - for (id, new_hash) in handler.results { - results.insert(id, (new_hash, vec![])); - } - - for message in &handler.to_users_messages { - let entry = results.get_mut(&message.source()).expect("should be"); - entry.1.push(message.clone()); - } - - to_users_messages.append(&mut handler.to_users_messages); } } @@ -139,46 +119,6 @@ async fn run_in_async( for handle in handles { handle.abort(); } - - let outcomes = results - .into_iter() - .map(|(id, (new_state_hash, outgoing_messages))| { - LocalOutcome::Transition(StateTransition { - actor_id: id, - new_state_hash, - value_to_receive: 0, // TODO (breathx): propose this - value_claims: vec![], // TODO (breathx): propose this - messages: - outgoing_messages - .into_iter() - .map(|message| { - let ( - id, - _source, - destination, - payload, - _gas_limit, - value, - message_details, - ) = message.into_parts(); - - let reply_details = - message_details.and_then(|details| details.to_reply_details()); - - OutgoingMessage { - id, - destination, - payload: payload.into_vec(), - value, - reply_details, - } - }) - .collect(), - }) - }) - .collect(); - - (to_users_messages, outcomes) } async fn run_task(db: Database, executor: &mut InstanceWrapper, task: Task) { @@ -231,12 +171,13 @@ async fn worker( async fn one_batch( from_index: usize, task_senders: &[mpsc::Sender], - programs: &mut BTreeMap, + in_block_transitions: &mut InBlockTransitions, ) -> BTreeMap>> { let mut result_receivers = BTreeMap::new(); - for (sender, (program_id, state_hash)) in - task_senders.iter().zip(programs.iter().skip(from_index)) + for (sender, (program_id, state_hash)) in task_senders + .iter() + .zip(in_block_transitions.states_iter().skip(from_index)) { let (result_sender, result_receiver) = oneshot::channel(); @@ -253,30 +194,3 @@ async fn one_batch( result_receivers } - -#[allow(unused)] // TODO (breathx) -async fn wake_messages( - task_senders: &[mpsc::Sender], - programs: &mut BTreeMap, -) { - let mut result_receivers = vec![]; - for (task_sender, (&program_id, &state_hash)) in - task_senders.iter().cycle().zip(programs.iter()) - { - let (result_sender, result_receiver) = oneshot::channel(); - task_sender - .send(Task::WakeMessages { - program_id, - state_hash, - result_sender, - }) - .await - .unwrap(); - result_receivers.push((program_id, result_receiver)); - } - - for (program_id, result_receiver) in result_receivers { - let new_state_hash = result_receiver.await; - programs.insert(program_id, new_state_hash.unwrap()); - } -} diff --git a/ethexe/processor/src/handling/tasks.rs b/ethexe/processor/src/handling/tasks.rs index 72321213186..a1291c74270 100644 --- a/ethexe/processor/src/handling/tasks.rs +++ b/ethexe/processor/src/handling/tasks.rs @@ -18,12 +18,12 @@ use crate::{LocalOutcome, Processor}; use anyhow::Result; -use common::{ - scheduler::{ScheduledTask, TaskHandler}, - CodeId, Gas, MessageId, ProgramId, ReservationId, -}; use ethexe_db::BlockMetaStorage; -use gear_core::message::Message; +use ethexe_runtime_common::InBlockTransitions; +use gear_core::{ + ids::{CodeId, MessageId, ProgramId, ReservationId}, + tasks::{ScheduledTask, TaskHandler}, +}; use gprimitives::{ActorId, H256}; use std::collections::BTreeMap; @@ -31,13 +31,11 @@ impl Processor { pub fn run_tasks( &mut self, block_hash: H256, - states: &mut BTreeMap, + in_block_transitions: &mut InBlockTransitions, tasks: &mut BTreeMap>>, ) -> Result> { let mut handler = TasksHandler { - states, - results: Default::default(), - to_users_messages: Default::default(), + in_block_transitions, }; let block_meta = self @@ -57,43 +55,41 @@ impl Processor { #[allow(unused)] pub struct TasksHandler<'a> { - pub states: &'a mut BTreeMap, - pub results: BTreeMap, - pub to_users_messages: Vec, + pub in_block_transitions: &'a mut InBlockTransitions, } impl<'a> TaskHandler for TasksHandler<'a> { - fn remove_from_mailbox(&mut self, _user_id: ActorId, _message_id: MessageId) -> Gas { + fn remove_from_mailbox(&mut self, _user_id: ActorId, _message_id: MessageId) -> u64 { unimplemented!("TODO (breathx)") } - fn remove_from_waitlist(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas { + fn remove_from_waitlist(&mut self, _program_id: ProgramId, _message_id: MessageId) -> u64 { unimplemented!("TODO (breathx)") } - fn send_dispatch(&mut self, _stashed_message_id: MessageId) -> Gas { + fn send_dispatch(&mut self, _stashed_message_id: MessageId) -> u64 { unimplemented!("TODO (breathx)") } - fn send_user_message(&mut self, _stashed_message_id: MessageId, _to_mailbox: bool) -> Gas { + fn send_user_message(&mut self, _stashed_message_id: MessageId, _to_mailbox: bool) -> u64 { unimplemented!("TODO (breathx)") } - fn wake_message(&mut self, _program_id: ProgramId, _message_id: MessageId) -> Gas { + fn wake_message(&mut self, _program_id: ProgramId, _message_id: MessageId) -> u64 { // TODO (breathx): consider deprecation of delayed wakes + non-concrete waits. unimplemented!("TODO (breathx)") } /* Deprecated APIs */ - fn pause_program(&mut self, _: ProgramId) -> Gas { + fn pause_program(&mut self, _: ProgramId) -> u64 { unreachable!("deprecated") } - fn remove_code(&mut self, _: CodeId) -> Gas { + fn remove_code(&mut self, _: CodeId) -> u64 { unreachable!("deprecated") } - fn remove_gas_reservation(&mut self, _: ProgramId, _: ReservationId) -> Gas { + fn remove_gas_reservation(&mut self, _: ProgramId, _: ReservationId) -> u64 { unreachable!("deprecated") } - fn remove_paused_program(&mut self, _: ProgramId) -> Gas { + fn remove_paused_program(&mut self, _: ProgramId) -> u64 { unreachable!("deprecated") } - fn remove_resume_session(&mut self, _: u32) -> Gas { + fn remove_resume_session(&mut self, _: u32) -> u64 { unreachable!("deprecated") } } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 6872432b7c7..921c0703263 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -19,19 +19,13 @@ //! Program's execution service for eGPU. use anyhow::Result; -use ethexe_common::{ - mirror::RequestEvent as MirrorEvent, router::StateTransition, BlockRequestEvent, -}; +use ethexe_common::{mirror::RequestEvent as MirrorEvent, BlockRequestEvent}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::Storage; -use gear_core::{ - ids::{prelude::CodeIdExt, ProgramId}, - message::ReplyInfo, -}; +use ethexe_runtime_common::{state::Storage, InBlockTransitions}; +use gear_core::{ids::prelude::CodeIdExt, message::ReplyInfo}; use gprimitives::{ActorId, CodeId, MessageId, H256}; use handling::run; use host::InstanceCreator; -use std::collections::BTreeMap; pub use common::LocalOutcome; @@ -83,64 +77,55 @@ impl Processor { ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:#?}"); - let mut states = self + let states = self .db .block_start_program_states(block_hash) .unwrap_or_default(); // TODO (breathx): shouldn't it be a panic? - let mut schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic? + let mut in_block_transitions = InBlockTransitions::new(states); - let mut all_value_claims = Default::default(); + let mut schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic? + // TODO (breathx): handle resulting addresses that were changed (e.g. top up balance wont be dumped as outcome). for event in events { match event { BlockRequestEvent::Router(event) => { - self.handle_router_event(&mut states, event)?; + self.handle_router_event(&mut in_block_transitions, event)?; } BlockRequestEvent::Mirror { address, event } => { - self.handle_mirror_event(&mut states, &mut all_value_claims, address, event)?; + self.handle_mirror_event(&mut in_block_transitions, address, event)?; } BlockRequestEvent::WVara(event) => { - self.handle_wvara_event(&mut states, event)?; + self.handle_wvara_event(&mut in_block_transitions, event)?; } } } - // TODO (breathx): handle outcomes. - let mut _outcomes = self.run_tasks(block_hash, &mut states, &mut schedule)?; + self.run_tasks(block_hash, &mut in_block_transitions, &mut schedule)?; + self.run(block_hash, &mut in_block_transitions); - let mut outcomes = self.run(block_hash, &mut states)?; - - for outcome in &mut outcomes { - if let LocalOutcome::Transition(StateTransition { - actor_id, - value_claims, - .. - }) = outcome - { - value_claims.extend(all_value_claims.remove(actor_id).unwrap_or_default()); - } - } + let (transitions, states) = in_block_transitions.finalize(); self.db.set_block_end_program_states(block_hash, states); self.db.set_block_end_schedule(block_hash, schedule); + let outcomes = transitions + .into_iter() + .map(LocalOutcome::Transition) + .collect(); + Ok(outcomes) } - // TODO: replace LocalOutcome with Transition struct. - pub fn run( - &mut self, - chain_head: H256, - programs: &mut BTreeMap, - ) -> Result> { + pub fn run(&mut self, chain_head: H256, in_block_transitions: &mut InBlockTransitions) { self.creator.set_chain_head(chain_head); - log::debug!("{programs:?}"); - - let messages_and_outcomes = run::run(8, self.db.clone(), self.creator.clone(), programs); - - Ok(messages_and_outcomes.1) + run::run( + 8, + self.db.clone(), + self.creator.clone(), + in_block_transitions, + ); } } @@ -159,17 +144,17 @@ impl OverlaidProcessor { ) -> Result { self.0.creator.set_chain_head(block_hash); - let mut states = self + let states = self .0 .db .block_start_program_states(block_hash) .unwrap_or_default(); - let mut value_claims = Default::default(); + let mut in_block_transitions = InBlockTransitions::new(states); - let Some(&state_hash) = states.get(&program_id) else { - return Err(anyhow::anyhow!("unknown program at specified block hash")); - }; + let state_hash = in_block_transitions + .state_of(&program_id) + .ok_or_else(|| anyhow::anyhow!("unknown program at specified block hash"))?; let state = self.0.db.read_state(state_hash).ok_or_else(|| { @@ -182,8 +167,7 @@ impl OverlaidProcessor { ); self.0.handle_mirror_event( - &mut states, - &mut value_claims, + &mut in_block_transitions, program_id, MirrorEvent::MessageQueueingRequested { id: MessageId::zero(), @@ -193,20 +177,22 @@ impl OverlaidProcessor { }, )?; - let (messages, _) = run::run(8, self.0.db.clone(), self.0.creator.clone(), &mut states); + run::run( + 8, + self.0.db.clone(), + self.0.creator.clone(), + &mut in_block_transitions, + ); - let res = messages + let res = in_block_transitions + .current_messages() .into_iter() - .find_map(|message| { - message.reply_details().and_then(|details| { - (details.to_message_id() == MessageId::zero()).then(|| { - let parts = message.into_parts(); - - ReplyInfo { - payload: parts.3.into_vec(), - value: parts.5, - code: details.to_reply_code(), - } + .find_map(|(_source, message)| { + message.reply_details.and_then(|details| { + (details.to_message_id() == MessageId::zero()).then(|| ReplyInfo { + payload: message.payload, + value: message.value, + code: details.to_reply_code(), }) }) }) diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 09d9fc6234b..f152b3f6fe7 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -22,7 +22,10 @@ use ethexe_common::{ }; use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, MemDb}; use ethexe_runtime_common::state::{ComplexStorage, Dispatch}; -use gear_core::{ids::prelude::CodeIdExt, message::DispatchKind}; +use gear_core::{ + ids::{prelude::CodeIdExt, ProgramId}, + message::DispatchKind, +}; use gprimitives::{ActorId, MessageId}; use parity_scale_codec::Encode; use std::collections::BTreeMap; @@ -274,24 +277,27 @@ fn ping_pong() { .handle_messages_queueing(state_hash, messages) .expect("failed to populate message queue"); - let mut programs = BTreeMap::from_iter([(program_id, state_hash)]); + let states = BTreeMap::from_iter([(program_id, state_hash)]); + let mut in_block_transitions = InBlockTransitions::new(states); - let (to_users, _) = run::run( + run::run( 8, processor.db.clone(), processor.creator.clone(), - &mut programs, + &mut in_block_transitions, ); + let to_users = in_block_transitions.current_messages(); + assert_eq!(to_users.len(), 2); - let message = &to_users[0]; - assert_eq!(message.destination(), user_id); - assert_eq!(message.payload_bytes(), b"PONG"); + let message = &to_users[0].1; + assert_eq!(message.destination, user_id); + assert_eq!(message.payload, b"PONG"); - let message = &to_users[1]; - assert_eq!(message.destination(), user_id); - assert_eq!(message.payload_bytes(), b"PONG"); + let message = &to_users[1].1; + assert_eq!(message.destination, user_id); + assert_eq!(message.payload, b"PONG"); } #[allow(unused)] @@ -405,32 +411,31 @@ fn async_and_ping() { .handle_message_queueing(async_state_hash, message) .expect("failed to populate message queue"); - let mut programs = - BTreeMap::from_iter([(ping_id, ping_state_hash), (async_id, async_state_hash)]); + let states = BTreeMap::from_iter([(ping_id, ping_state_hash), (async_id, async_state_hash)]); + let mut in_block_transitions = InBlockTransitions::new(states); - let (to_users, _) = run::run( + run::run( 8, processor.db.clone(), processor.creator.clone(), - &mut programs, + &mut in_block_transitions, ); + let to_users = in_block_transitions.current_messages(); + assert_eq!(to_users.len(), 3); - let message = &to_users[0]; - assert_eq!(message.destination(), user_id); - assert_eq!(message.payload_bytes(), b"PONG"); + let message = &to_users[0].1; + assert_eq!(message.destination, user_id); + assert_eq!(message.payload, b"PONG"); - let message = &to_users[1]; - assert_eq!(message.destination(), user_id); - assert_eq!(message.payload_bytes(), b""); + let message = &to_users[1].1; + assert_eq!(message.destination, user_id); + assert_eq!(message.payload, b""); - let message = &to_users[2]; - assert_eq!(message.destination(), user_id); - assert_eq!( - message.payload_bytes(), - wait_for_reply_to.into_bytes().as_slice() - ); + let message = &to_users[2].1; + assert_eq!(message.destination, user_id); + assert_eq!(message.payload, wait_for_reply_to.into_bytes().as_slice()); } // TODO (breathx). diff --git a/ethexe/runtime/common/Cargo.toml b/ethexe/runtime/common/Cargo.toml index 8006145b01b..0e7b00ddb19 100644 --- a/ethexe/runtime/common/Cargo.toml +++ b/ethexe/runtime/common/Cargo.toml @@ -9,6 +9,8 @@ repository.workspace = true [dependencies] +ethexe-common.workspace = true + gear-lazy-pages-common.workspace = true core-processor.workspace = true gear-core.workspace = true diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 9ccf5b68e39..c61fb2e7be7 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -1,13 +1,17 @@ -use crate::state::{ - self, ActiveProgram, ComplexStorage, Dispatch, HashAndLen, MaybeHash, Program, ProgramState, - Storage, +use crate::{ + state::{ + self, ActiveProgram, ComplexStorage, Dispatch, HashAndLen, MaybeHash, Program, + ProgramState, Storage, + }, + InBlockTransitions, }; -use alloc::{collections::BTreeMap, vec::Vec}; +use alloc::{collections::BTreeMap, vec, vec::Vec}; use anyhow::Result; use core_processor::{ common::{DispatchOutcome, JournalHandler}, configs::BlockInfo, }; +use ethexe_common::router::OutgoingMessage; use gear_core::{ ids::ProgramId, memory::PageBuf, @@ -24,12 +28,9 @@ use gprimitives::{ActorId, CodeId, MessageId, ReservationId, H256}; pub struct Handler<'a, S: Storage> { pub program_id: ProgramId, - pub program_states: &'a mut BTreeMap, + pub in_block_transitions: &'a mut InBlockTransitions, pub storage: &'a S, pub block_info: BlockInfo, - // TODO: replace with something reasonable. - pub results: BTreeMap, - pub to_users_messages: Vec, } impl Handler<'_, S> { @@ -37,7 +38,7 @@ impl Handler<'_, S> { &mut self, program_id: ProgramId, f: impl FnOnce(&mut ProgramState) -> Result<()>, - ) { + ) -> H256 { self.update_state_with_storage(program_id, |_s, state| f(state)) } @@ -45,18 +46,21 @@ impl Handler<'_, S> { &mut self, program_id: ProgramId, f: impl FnOnce(&S, &mut ProgramState) -> Result<()>, - ) { + ) -> H256 { let state_hash = self - .program_states - .get_mut(&program_id) + .in_block_transitions + .state_of(&program_id) .expect("failed to find program in known states"); - *state_hash = self + let new_state_hash = self .storage - .mutate_state(*state_hash, f) + .mutate_state(state_hash, f) .expect("failed to mutate state"); - self.results.insert(program_id, *state_hash); + self.in_block_transitions + .modify_state(program_id, new_state_hash); + + new_state_hash } fn pop_queue_message(state: &ProgramState, storage: &S) -> (H256, MessageId) { @@ -176,7 +180,11 @@ impl JournalHandler for Handler<'_, S> { todo!("delayed sending isn't supported yet"); } - if !self.program_states.contains_key(&dispatch.destination()) { + if self + .in_block_transitions + .state_of(&dispatch.destination()) + .is_none() + { if !dispatch.is_reply() { self.update_state_with_storage(dispatch.source(), |storage, state| { state.mailbox_hash = @@ -191,7 +199,23 @@ impl JournalHandler for Handler<'_, S> { }); } - self.to_users_messages.push(dispatch.into_parts().1); + // TODO (breathx): send here to in_block_transitions. + let source = dispatch.source(); + let message = dispatch.into_parts().1; + + let source_state_hash = self + .in_block_transitions + .state_of(&source) + .expect("must exist"); + + self.in_block_transitions.modify_state_with( + source, + source_state_hash, + 0, + vec![], + vec![OutgoingMessage::from(message)], + ); + return; } diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index f1a4a755bed..7508887c709 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -21,6 +21,8 @@ #![cfg_attr(not(feature = "std"), no_std)] #![allow(unused)] +extern crate alloc; + use alloc::{ collections::{BTreeMap, VecDeque}, vec::Vec, @@ -48,18 +50,19 @@ use state::{ Storage, Waitlist, }; -extern crate alloc; - -mod journal; -pub mod state; - pub use core_processor::configs::BlockInfo; pub use journal::Handler; +pub use transitions::{InBlockTransitions, NonFinalTransition}; -const RUNTIME_ID: u32 = 0; +pub mod state; + +mod journal; +mod transitions; pub const BLOCK_GAS_LIMIT: u64 = 1_000_000_000_000; +const RUNTIME_ID: u32 = 0; + pub trait RuntimeInterface { type LazyPages: LazyPagesInterface + 'static; diff --git a/ethexe/runtime/common/src/transitions.rs b/ethexe/runtime/common/src/transitions.rs new file mode 100644 index 00000000000..7413578d7ee --- /dev/null +++ b/ethexe/runtime/common/src/transitions.rs @@ -0,0 +1,146 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use alloc::{ + collections::{btree_map::Iter, BTreeMap}, + vec::Vec, +}; +use ethexe_common::router::{OutgoingMessage, StateTransition, ValueClaim}; +use gprimitives::{ActorId, CodeId, H256}; +use parity_scale_codec::{Decode, Encode}; + +#[derive(Default)] +pub struct InBlockTransitions { + states: BTreeMap, + modifications: BTreeMap, +} + +impl InBlockTransitions { + pub fn new(states: BTreeMap) -> Self { + Self { + states, + ..Default::default() + } + } + + pub fn state_of(&self, actor_id: &ActorId) -> Option { + self.states.get(actor_id).cloned() + } + + pub fn states_amount(&self) -> usize { + self.states.len() + } + + pub fn states_iter(&self) -> Iter { + self.states.iter() + } + + pub fn current_messages(&self) -> Vec<(ActorId, OutgoingMessage)> { + self.modifications + .iter() + .flat_map(|(id, trans)| trans.messages.iter().map(|message| (*id, message.clone()))) + .collect() + } + + pub fn register_new(&mut self, actor_id: ActorId) { + self.states.insert(actor_id, H256::zero()); + self.modifications.insert(actor_id, Default::default()); + } + + pub fn modify_state(&mut self, actor_id: ActorId, new_state_hash: H256) -> Option<()> { + self.modify_state_with( + actor_id, + new_state_hash, + 0, + Default::default(), + Default::default(), + ) + } + + pub fn modify_state_with( + &mut self, + actor_id: ActorId, + new_state_hash: H256, + extra_value_to_receive: u128, + extra_claims: Vec, + extra_messages: Vec, + ) -> Option<()> { + let initial_state = self.states.insert(actor_id, new_state_hash)?; + + let transition = self + .modifications + .entry(actor_id) + .or_insert(NonFinalTransition { + initial_state, + ..Default::default() + }); + + transition.value_to_receive += extra_value_to_receive; + transition.claims.extend(extra_claims); + transition.messages.extend(extra_messages); + + Some(()) + } + + pub fn finalize(self) -> (Vec, BTreeMap) { + let Self { + states, + modifications, + } = self; + + let mut res = Vec::with_capacity(modifications.len()); + + for (actor_id, modification) in modifications { + let new_state_hash = states + .get(&actor_id) + .cloned() + .expect("failed to find state record for modified state"); + + if !modification.is_noop(new_state_hash) { + res.push(StateTransition { + actor_id, + new_state_hash, + value_to_receive: modification.value_to_receive, + value_claims: modification.claims, + messages: modification.messages, + }); + } + } + + (res, states) + } +} + +#[derive(Default)] +pub struct NonFinalTransition { + initial_state: H256, + pub value_to_receive: u128, + pub claims: Vec, + pub messages: Vec, +} + +impl NonFinalTransition { + pub fn is_noop(&self, current_state: H256) -> bool { + // check if just created program (always op) + !self.initial_state.is_zero() + // check if state hash changed at final (always op) + && current_state == self.initial_state + // check if with unchanged state needs commitment (op) + && (self.value_to_receive == 0 && self.claims.is_empty() && self.messages.is_empty()) + } +} diff --git a/gsdk/src/metadata/generated.rs b/gsdk/src/metadata/generated.rs index a116b58360d..de5dd321109 100644 --- a/gsdk/src/metadata/generated.rs +++ b/gsdk/src/metadata/generated.rs @@ -569,49 +569,6 @@ pub mod runtime_types { pub struct NodeLock<_0>(pub [_0; 4usize]); } } - pub mod scheduler { - use super::runtime_types; - pub mod task { - use super::runtime_types; - #[derive( - Debug, crate::gp::Decode, crate::gp::DecodeAsType, crate::gp::Encode, - )] - pub enum ScheduledTask<_0> { - #[codec(index = 0)] - PauseProgram(runtime_types::gprimitives::ActorId), - #[codec(index = 1)] - RemoveCode(runtime_types::gprimitives::CodeId), - #[codec(index = 2)] - RemoveFromMailbox(_0, runtime_types::gprimitives::MessageId), - #[codec(index = 3)] - RemoveFromWaitlist( - runtime_types::gprimitives::ActorId, - runtime_types::gprimitives::MessageId, - ), - #[codec(index = 4)] - RemovePausedProgram(runtime_types::gprimitives::ActorId), - #[codec(index = 5)] - WakeMessage( - runtime_types::gprimitives::ActorId, - runtime_types::gprimitives::MessageId, - ), - #[codec(index = 6)] - SendDispatch(runtime_types::gprimitives::MessageId), - #[codec(index = 7)] - SendUserMessage { - message_id: runtime_types::gprimitives::MessageId, - to_mailbox: ::core::primitive::bool, - }, - #[codec(index = 8)] - RemoveGasReservation( - runtime_types::gprimitives::ActorId, - runtime_types::gprimitives::ReservationId, - ), - #[codec(index = 9)] - RemoveResumeSession(::core::primitive::u32), - } - } - } pub mod storage { use super::runtime_types; pub mod complicated { @@ -941,6 +898,44 @@ pub mod runtime_types { )] pub struct ReservationNonce(pub ::core::primitive::u64); } + pub mod tasks { + use super::runtime_types; + #[derive(Debug, crate::gp::Decode, crate::gp::DecodeAsType, crate::gp::Encode)] + pub enum ScheduledTask<_0> { + #[codec(index = 0)] + PauseProgram(runtime_types::gprimitives::ActorId), + #[codec(index = 1)] + RemoveCode(runtime_types::gprimitives::CodeId), + #[codec(index = 2)] + RemoveFromMailbox(_0, runtime_types::gprimitives::MessageId), + #[codec(index = 3)] + RemoveFromWaitlist( + runtime_types::gprimitives::ActorId, + runtime_types::gprimitives::MessageId, + ), + #[codec(index = 4)] + RemovePausedProgram(runtime_types::gprimitives::ActorId), + #[codec(index = 5)] + WakeMessage( + runtime_types::gprimitives::ActorId, + runtime_types::gprimitives::MessageId, + ), + #[codec(index = 6)] + SendDispatch(runtime_types::gprimitives::MessageId), + #[codec(index = 7)] + SendUserMessage { + message_id: runtime_types::gprimitives::MessageId, + to_mailbox: ::core::primitive::bool, + }, + #[codec(index = 8)] + RemoveGasReservation( + runtime_types::gprimitives::ActorId, + runtime_types::gprimitives::ReservationId, + ), + #[codec(index = 9)] + RemoveResumeSession(::core::primitive::u32), + } + } } pub mod gear_core_errors { use super::runtime_types; diff --git a/gtest/src/manager.rs b/gtest/src/manager.rs index 4b88b049f60..04bd511ae53 100644 --- a/gtest/src/manager.rs +++ b/gtest/src/manager.rs @@ -55,7 +55,7 @@ use gear_common::{ BlockNumber, }, event::{MessageWaitedReason, MessageWaitedRuntimeReason}, - scheduler::{ScheduledTask, StorageType}, + scheduler::StorageType, storage::Interval, LockId, Origin, }; @@ -69,6 +69,7 @@ use gear_core::{ StoredDispatch, StoredMessage, UserMessage, UserStoredMessage, }, pages::{num_traits::Zero, GearPage}, + tasks::ScheduledTask, }; use gear_lazy_pages_native_interface::LazyPagesNative; use hold_bound::HoldBoundBuilder; diff --git a/gtest/src/manager/journal.rs b/gtest/src/manager/journal.rs index b01bc334884..f0a6f6e3281 100644 --- a/gtest/src/manager/journal.rs +++ b/gtest/src/manager/journal.rs @@ -28,7 +28,7 @@ use crate::{ use core_processor::common::{DispatchOutcome, JournalHandler}; use gear_common::{ event::{MessageWaitedRuntimeReason, RuntimeReason}, - scheduler::{ScheduledTask, StorageType, TaskHandler}, + scheduler::StorageType, }; use gear_core::{ ids::{CodeId, MessageId, ProgramId, ReservationId}, @@ -40,6 +40,7 @@ use gear_core::{ GearPage, WasmPage, }, reservation::GasReserver, + tasks::{ScheduledTask, TaskHandler}, }; use gear_core_errors::SignalCode; use std::collections::BTreeMap; diff --git a/gtest/src/manager/reservations.rs b/gtest/src/manager/reservations.rs index e63bef8046a..61818017815 100644 --- a/gtest/src/manager/reservations.rs +++ b/gtest/src/manager/reservations.rs @@ -19,12 +19,8 @@ //! Various reservation related methods for ExtManager use super::ExtManager; -use gear_common::{ - scheduler::{ScheduledTask, StorageType}, - storage::Interval, - ProgramId, ReservationId, -}; -use gear_core::reservation::GasReservationSlot; +use gear_common::{scheduler::StorageType, storage::Interval, ProgramId, ReservationId}; +use gear_core::{reservation::GasReservationSlot, tasks::ScheduledTask}; impl ExtManager { pub(crate) fn remove_gas_reservation_impl( diff --git a/gtest/src/manager/task.rs b/gtest/src/manager/task.rs index 27c9fc341cc..4076a09acf5 100644 --- a/gtest/src/manager/task.rs +++ b/gtest/src/manager/task.rs @@ -21,14 +21,12 @@ use super::ExtManager; use crate::{state::actors::Actors, Gas}; use core_processor::common::JournalHandler; -use gear_common::{ - scheduler::{ScheduledTask, StorageType, TaskHandler}, - Gas as GearCommonGas, -}; +use gear_common::{scheduler::StorageType, Gas as GearCommonGas}; use gear_core::{ gas_metering::TaskWeights, ids::{CodeId, MessageId, ProgramId, ReservationId}, message::{DispatchKind, ReplyMessage}, + tasks::{ScheduledTask, TaskHandler}, }; use gear_core_errors::{ErrorReplyReason, SignalCode}; diff --git a/gtest/src/state/task_pool.rs b/gtest/src/state/task_pool.rs index edf51c89405..b06de027c8c 100644 --- a/gtest/src/state/task_pool.rs +++ b/gtest/src/state/task_pool.rs @@ -23,10 +23,11 @@ use gear_common::{ task_pool::{AuxiliaryTaskpool, TaskPoolErrorImpl, TaskPoolStorageWrap}, BlockNumber, }, - scheduler::{ScheduledTask, TaskPool, TaskPoolCallbacks}, + scheduler::{TaskPool, TaskPoolCallbacks}, storage::KeyIterableByKeyMap, ProgramId, }; +use gear_core::tasks::ScheduledTask; /// Task pool manager which operates under the hood over /// [`gear_common::auxiliary::task_pool::AuxiliaryTaskpool`]. @@ -90,9 +91,8 @@ impl TaskPoolCallbacks for TaskPoolCallbacksImpl { #[cfg(test)] mod tests { - use gear_common::{scheduler::ScheduledTask, ProgramId}; - use super::TaskPoolManager; + use gear_core::{ids::ProgramId, tasks::ScheduledTask}; #[test] fn test_taskpool() { diff --git a/pallets/gear-program/src/lib.rs b/pallets/gear-program/src/lib.rs index bf9e0e23585..aa64b059d25 100644 --- a/pallets/gear-program/src/lib.rs +++ b/pallets/gear-program/src/lib.rs @@ -161,6 +161,7 @@ pub mod pallet { memory::PageBuf, pages::{numerated::tree::IntervalsTree, GearPage, WasmPage}, program::{MemoryInfix, Program}, + tasks::ScheduledTask, }; use sp_runtime::DispatchError; diff --git a/pallets/gear-scheduler/src/lib.rs b/pallets/gear-scheduler/src/lib.rs index 1095702c432..2c0c938e663 100644 --- a/pallets/gear-scheduler/src/lib.rs +++ b/pallets/gear-scheduler/src/lib.rs @@ -64,6 +64,7 @@ pub mod pallet { traits::{Get, StorageVersion}, }; use frame_system::pallet_prelude::*; + use gear_core::tasks::ScheduledTask; use sp_runtime::DispatchError; use sp_std::{convert::TryInto, marker::PhantomData}; diff --git a/pallets/gear-scheduler/src/tests.rs b/pallets/gear-scheduler/src/tests.rs index 7d05028289e..274f63b6133 100644 --- a/pallets/gear-scheduler/src/tests.rs +++ b/pallets/gear-scheduler/src/tests.rs @@ -23,7 +23,7 @@ extern crate alloc; use crate::{mock::*, *}; use common::{scheduler::*, storage::*, GasTree, LockId, LockableTree as _, Origin}; use frame_system::pallet_prelude::BlockNumberFor; -use gear_core::{ids::*, message::*}; +use gear_core::{ids::*, message::*, tasks::*}; use gear_core_errors::ErrorReplyReason; use pallet_gear::{GasAllowanceOf, GasHandlerOf}; use sp_core::H256; diff --git a/pallets/gear/src/benchmarking/mod.rs b/pallets/gear/src/benchmarking/mod.rs index bdfde8ef6f3..9767b778af9 100644 --- a/pallets/gear/src/benchmarking/mod.rs +++ b/pallets/gear/src/benchmarking/mod.rs @@ -64,7 +64,6 @@ use crate::{ use ::alloc::{collections::BTreeMap, vec}; use common::{ self, benchmarking, - scheduler::{ScheduledTask, TaskHandler}, storage::{Counter, *}, CodeMetadata, CodeStorage, GasTree, Origin, ProgramStorage, ReservableTree, }; @@ -73,6 +72,7 @@ use core_processor::{ configs::BlockConfig, ProcessExecutionContext, ProcessorContext, ProcessorExternalities, }; +use gear_core::tasks::{ScheduledTask, TaskHandler}; use parity_scale_codec::Encode; use frame_benchmarking::{benchmarks, whitelisted_caller}; diff --git a/pallets/gear/src/internal.rs b/pallets/gear/src/internal.rs index d10ace6a605..5307a2d3d24 100644 --- a/pallets/gear/src/internal.rs +++ b/pallets/gear/src/internal.rs @@ -45,6 +45,7 @@ use gear_core::{ Dispatch, DispatchKind, Message, ReplyMessage, StoredDispatch, UserMessage, UserStoredMessage, }, + tasks::ScheduledTask, }; use sp_runtime::traits::{Get, One, SaturatedConversion, Saturating, UniqueSaturatedInto, Zero}; diff --git a/pallets/gear/src/lib.rs b/pallets/gear/src/lib.rs index f5e5e9a841a..5568a95a5df 100644 --- a/pallets/gear/src/lib.rs +++ b/pallets/gear/src/lib.rs @@ -89,6 +89,7 @@ use gear_core::{ ids::{prelude::*, CodeId, MessageId, ProgramId, ReservationId}, message::*, percent::Percent, + tasks::ScheduledTask, }; use gear_lazy_pages_common::LazyPagesInterface; use gear_lazy_pages_interface::LazyPagesRuntimeInterface; diff --git a/pallets/gear/src/manager/journal.rs b/pallets/gear/src/manager/journal.rs index 6f365cf5c28..4b43523921b 100644 --- a/pallets/gear/src/manager/journal.rs +++ b/pallets/gear/src/manager/journal.rs @@ -25,7 +25,7 @@ use crate::{ use alloc::format; use common::{ event::*, - scheduler::{ScheduledTask, SchedulingCostsPerBlock, StorageType, TaskHandler, TaskPool}, + scheduler::{SchedulingCostsPerBlock, StorageType, TaskPool}, storage::*, CodeStorage, LockableTree, Origin, ProgramStorage, ReservableTree, }; @@ -42,6 +42,7 @@ use gear_core::{ pages::{numerated::tree::IntervalsTree, GearPage, WasmPage}, program::{Program, ProgramState}, reservation::GasReserver, + tasks::{ScheduledTask, TaskHandler}, }; use gear_core_errors::SignalCode; use sp_runtime::traits::{UniqueSaturatedInto, Zero}; diff --git a/pallets/gear/src/manager/mod.rs b/pallets/gear/src/manager/mod.rs index 939551eff56..105168e42e6 100644 --- a/pallets/gear/src/manager/mod.rs +++ b/pallets/gear/src/manager/mod.rs @@ -59,7 +59,7 @@ use crate::{ use alloc::format; use common::{ event::*, - scheduler::{ScheduledTask, StorageType, TaskPool}, + scheduler::{StorageType, TaskPool}, storage::{Interval, IterableByKeyMap, Queue}, CodeStorage, Origin, ProgramStorage, ReservableTree, }; @@ -73,6 +73,7 @@ use gear_core::{ pages::WasmPagesAmount, program::{ActiveProgram, Program, ProgramState}, reservation::GasReservationSlot, + tasks::ScheduledTask, }; use primitive_types::H256; use scale_info::TypeInfo; diff --git a/pallets/gear/src/manager/task.rs b/pallets/gear/src/manager/task.rs index a1ab34ecab4..e494c04bff3 100644 --- a/pallets/gear/src/manager/task.rs +++ b/pallets/gear/src/manager/task.rs @@ -33,6 +33,7 @@ use core::cmp; use gear_core::{ ids::{CodeId, MessageId, ProgramId, ReservationId}, message::{DispatchKind, Payload, ReplyMessage}, + tasks::{ScheduledTask, TaskHandler}, }; use gear_core_errors::{ErrorReplyReason, SignalCode}; diff --git a/pallets/gear/src/tests.rs b/pallets/gear/src/tests.rs index baadad545d9..2c3248c8d4c 100644 --- a/pallets/gear/src/tests.rs +++ b/pallets/gear/src/tests.rs @@ -60,6 +60,7 @@ use gear_core::{ WasmPage, }, program::ActiveProgram, + tasks::ScheduledTask, }; use gear_core_backend::error::{ TrapExplanation, UnrecoverableExecutionError, UnrecoverableExtError, UnrecoverableWaitError,