diff --git a/common/src/auxiliary/mailbox.rs b/common/src/auxiliary/mailbox.rs index ae1e59a08f1..d3727dabe58 100644 --- a/common/src/auxiliary/mailbox.rs +++ b/common/src/auxiliary/mailbox.rs @@ -37,7 +37,6 @@ pub type AuxiliaryMailbox = MailboxImpl< MailboxCallbacks, MailboxKeyGen, >; - /// Type represents message stored in the mailbox. pub type MailboxedMessage = UserStoredMessage; @@ -68,6 +67,7 @@ impl AuxiliaryDoubleStorageWrap for MailboxStorageWrap { MAILBOX_STORAGE.with_borrow_mut(f) } } + /// An implementor of the error returned from calling `Mailbox` trait functions. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum MailboxErrorImpl { diff --git a/common/src/auxiliary/mod.rs b/common/src/auxiliary/mod.rs index eb8350a282d..1c2aa1fe26f 100644 --- a/common/src/auxiliary/mod.rs +++ b/common/src/auxiliary/mod.rs @@ -22,7 +22,7 @@ pub mod gas_provider; pub mod mailbox; -pub mod taskpool; +pub mod task_pool; use crate::storage::{ Counted, CountedByKey, DoubleMapStorage, GetFirstPos, GetSecondPos, IterableByKeyMap, diff --git a/common/src/auxiliary/taskpool.rs b/common/src/auxiliary/task_pool.rs similarity index 59% rename from common/src/auxiliary/taskpool.rs rename to common/src/auxiliary/task_pool.rs index 637db7aa72e..622109c455b 100644 --- a/common/src/auxiliary/taskpool.rs +++ b/common/src/auxiliary/task_pool.rs @@ -1,10 +1,29 @@ -//! Auxiliary implementation of the taskpool. +// This file is part of Gear. + +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Auxiliary implementation of the task pool. use super::{AuxiliaryDoubleStorageWrap, BlockNumber, DoubleBTreeMap}; use crate::scheduler::{ScheduledTask, TaskPoolImpl}; use gear_core::ids::ProgramId; use std::cell::RefCell; +/// Task pool implementation that can be used in a native, non-wasm runtimes. pub type AuxiliaryTaskpool = TaskPoolImpl< TaskPoolStorageWrap, ScheduledTask, @@ -17,6 +36,7 @@ std::thread_local! { pub(crate) static TASKPOOL_STORAGE: RefCell, ()>> = const { RefCell::new(DoubleBTreeMap::new()) }; } +/// `TaskPool` double storage map manager pub struct TaskPoolStorageWrap; impl AuxiliaryDoubleStorageWrap for TaskPoolStorageWrap { @@ -39,6 +59,7 @@ impl AuxiliaryDoubleStorageWrap for TaskPoolStorageWrap { } } +/// An implementor of the error returned from calling `TaskPool` trait functions. #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum TaskPoolErrorImpl { /// Occurs when given task already exists in task pool. diff --git a/gtest/src/lib.rs b/gtest/src/lib.rs index b913cc8ea10..7903cfa4723 100644 --- a/gtest/src/lib.rs +++ b/gtest/src/lib.rs @@ -452,7 +452,7 @@ mod mailbox; mod manager; mod program; mod system; -mod taskpool; +mod task_pool; pub use crate::log::{BlockRunResult, CoreLog, Log}; pub use codec; diff --git a/gtest/src/manager.rs b/gtest/src/manager.rs index 70586bdd701..8414cb18edb 100644 --- a/gtest/src/manager.rs +++ b/gtest/src/manager.rs @@ -16,12 +16,16 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +mod journal; +mod task; + use crate::{ blocks::BlocksManager, gas_tree::GasTreeManager, log::{BlockRunResult, CoreLog}, mailbox::MailboxManager, program::{Gas, WasmProgram}, + task_pool::TaskPoolManager, Result, TestError, DISPATCH_HOLD_COST, EPOCH_DURATION_IN_BLOCKS, EXISTENTIAL_DEPOSIT, GAS_ALLOWANCE, HOST_FUNC_READ_COST, HOST_FUNC_WRITE_AFTER_READ_COST, HOST_FUNC_WRITE_COST, INITIAL_RANDOM_SEED, LOAD_ALLOCATIONS_PER_INTERVAL, LOAD_PAGE_STORAGE_DATA_COST, @@ -40,25 +44,21 @@ use core_processor::{ }, ContextChargedForCode, ContextChargedForInstrumentation, Ext, }; -use gear_common::{auxiliary::mailbox::MailboxErrorImpl, Origin}; +use gear_common::{ + auxiliary::{mailbox::MailboxErrorImpl, BlockNumber}, + scheduler::ScheduledTask, +}; use gear_core::{ code::{Code, CodeAndId, InstrumentedCode, InstrumentedCodeAndId, TryNewCodeConfig}, ids::{prelude::*, CodeId, MessageId, ProgramId, ReservationId}, memory::PageBuf, - message::{ - Dispatch, DispatchKind, Message, MessageWaitedType, ReplyMessage, ReplyPacket, - SignalMessage, StoredDispatch, StoredMessage, - }, - pages::{ - numerated::{iterators::IntervalIterator, tree::IntervalsTree}, - GearPage, WasmPage, - }, - reservation::{GasReservationMap, GasReserver}, + message::{Dispatch, DispatchKind, ReplyMessage, ReplyPacket, StoredDispatch, StoredMessage}, + pages::{numerated::tree::IntervalsTree, GearPage, WasmPage}, + reservation::GasReservationMap, }; -use gear_core_errors::{ErrorReplyReason, SignalCode, SimpleExecutionError}; +use gear_core_errors::{ErrorReplyReason, SimpleExecutionError}; use gear_lazy_pages_common::LazyPagesCosts; use gear_lazy_pages_native_interface::LazyPagesNative; -use gear_wasm_instrument::gas_metering::Schedule; use rand::{rngs::StdRng, RngCore, SeedableRng}; use std::{ cell::{Ref, RefCell, RefMut}, @@ -241,11 +241,11 @@ pub(crate) struct ExtManager { pub(crate) meta_binaries: BTreeMap>, pub(crate) dispatches: VecDeque, pub(crate) mailbox: MailboxManager, - pub(crate) wait_list: BTreeMap<(ProgramId, MessageId), StoredDispatch>, - pub(crate) wait_list_schedules: BTreeMap>, + pub(crate) task_pool: TaskPoolManager, + pub(crate) wait_list: BTreeMap<(ProgramId, MessageId), (StoredDispatch, Option)>, pub(crate) gas_tree: GasTreeManager, pub(crate) gas_allowance: Gas, - pub(crate) delayed_dispatches: HashMap>, + pub(crate) dispatches_stash: HashMap, pub(crate) messages_processing_enabled: bool, // Last block execution info @@ -315,50 +315,24 @@ impl ExtManager { } /// Insert message into the delayed queue. - fn send_delayed_dispatch(&mut self, dispatch: Dispatch, bn: u32) { - self.delayed_dispatches - .entry(bn) - .or_default() - .push(dispatch) - } - - /// Process all delayed dispatches. - pub(crate) fn process_delayed_dispatches(&mut self, bn: u32) { - let Some(dispatches) = self.delayed_dispatches.remove(&bn) else { - return; - }; - - for dispatch in dispatches { - self.route_dispatch_from_task_pool(dispatch); - } - } - - /// Process scheduled wait list. - pub(crate) fn process_scheduled_wait_list(&mut self, bn: u32) { - let Some(wl_schedules) = self.wait_list_schedules.remove(&bn) else { - return; + fn send_delayed_dispatch(&mut self, dispatch: Dispatch, delay: u32) { + let message_id = dispatch.id(); + let task = if self.is_program(&dispatch.destination()) { + ScheduledTask::SendDispatch(message_id) + } else { + // TODO #4122, `to_mailbox` must be counted from provided gas + ScheduledTask::SendUserMessage { + message_id, + to_mailbox: true, + } }; - for wl_schedule in wl_schedules { - let Some(dispatch) = self.wait_list.remove(&wl_schedule) else { - continue; - }; - - let (kind, message, ..) = dispatch.into_parts(); - let message = Message::new( - message.id(), - message.source(), - message.destination(), - message - .payload_bytes() - .to_vec() - .try_into() - .unwrap_or_default(), - self.gas_tree.get_limit(message.id()).ok(), - message.value(), - message.details(), - ); - self.route_dispatch_from_task_pool(Dispatch::new(kind, message)); + let expected_bn = self.blocks_manager.get().height + delay; + self.task_pool + .add(expected_bn, task) + .unwrap_or_else(|e| unreachable!("TaskPool corrupted! {e:?}")); + if self.dispatches_stash.insert(message_id, dispatch).is_some() { + unreachable!("Delayed sending logic invalidated: stash contains same message"); } } @@ -449,8 +423,9 @@ impl ExtManager { #[track_caller] pub(crate) fn process_tasks(&mut self, bn: u32) { - self.process_delayed_dispatches(bn); - self.process_scheduled_wait_list(bn); + for task in self.task_pool.drain_prefix_keys(bn) { + task.process_with(self); + } } #[track_caller] @@ -516,25 +491,6 @@ impl ExtManager { } } - #[track_caller] - pub(crate) fn route_dispatch_from_task_pool(&mut self, dispatch: Dispatch) { - if self.is_program(&dispatch.destination()) { - self.dispatches.push_back(dispatch.into_stored()); - } else { - let message = dispatch.into_parts().1.into_stored(); - if let (Ok(mailbox_msg), true) = ( - message.clone().try_into(), - self.is_program(&message.source()), - ) { - self.mailbox - .insert(mailbox_msg) - .unwrap_or_else(|e| unreachable!("Mailbox corrupted! {:?}", e)); - } - - self.log.push(message) - } - } - /// Call non-void meta function from actor stored in manager. /// Warning! This is a static call that doesn't change actors pages data. pub(crate) fn read_state_bytes( @@ -750,7 +706,8 @@ impl ExtManager { }; if let Some(reply_message) = maybe_reply_message { - self.send_dispatch( + ::send_dispatch( + self, message_id, reply_message.into_dispatch(program_id, dispatch.source(), message_id), 0, @@ -801,7 +758,8 @@ impl ExtManager { let reply_message = ReplyMessage::system(message_id, err_payload, err); - self.send_dispatch( + ::send_dispatch( + self, message_id, reply_message.into_dispatch(program_id, dispatch.source(), message_id), 0, @@ -992,376 +950,3 @@ impl ExtManager { .and_then(|(actor, _)| actor.genuine_program_mut().map(op)) } } - -impl JournalHandler for ExtManager { - fn message_dispatched( - &mut self, - message_id: MessageId, - _source: ProgramId, - outcome: DispatchOutcome, - ) { - match outcome { - DispatchOutcome::MessageTrap { .. } => { - self.failed.insert(message_id); - } - DispatchOutcome::NoExecution => { - self.not_executed.insert(message_id); - } - DispatchOutcome::Success | DispatchOutcome::Exit { .. } => { - self.succeed.insert(message_id); - } - DispatchOutcome::InitFailure { program_id, .. } => { - self.init_failure(program_id); - self.failed.insert(message_id); - } - DispatchOutcome::InitSuccess { program_id, .. } => { - self.init_success(program_id); - self.succeed.insert(message_id); - } - } - } - - fn gas_burned(&mut self, message_id: MessageId, amount: u64) { - self.gas_allowance = self.gas_allowance.saturating_sub(Gas(amount)); - self.gas_tree - .spend(message_id, amount) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - - self.gas_burned - .entry(message_id) - .and_modify(|gas| { - *gas += Gas(amount); - }) - .or_insert(Gas(amount)); - } - - fn exit_dispatch(&mut self, id_exited: ProgramId, value_destination: ProgramId) { - if let Some((_, balance)) = self.actors.remove(&id_exited) { - self.mint_to(&value_destination, balance, MintMode::AllowDeath); - } - } - - fn message_consumed(&mut self, message_id: MessageId) { - self.gas_tree - .consume(message_id) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - } - - fn send_dispatch( - &mut self, - message_id: MessageId, - dispatch: Dispatch, - bn: u32, - reservation: Option, - ) { - if bn > 0 { - log::debug!("[{message_id}] new delayed dispatch#{}", dispatch.id()); - - self.send_delayed_dispatch(dispatch, self.blocks_manager.get().height + bn); - return; - } - - log::debug!("[{message_id}] new dispatch#{}", dispatch.id()); - - let source = dispatch.source(); - - if self.is_program(&dispatch.destination()) { - match (dispatch.gas_limit(), reservation) { - (Some(gas_limit), None) => self - .gas_tree - .split_with_value(false, message_id, dispatch.id(), gas_limit) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)), - (None, None) => self - .gas_tree - .split(false, message_id, dispatch.id()) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)), - (None, Some(reservation)) => { - self.gas_tree - .split(false, reservation, dispatch.id()) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - } - (Some(_), Some(_)) => unreachable!( - "Sending dispatch with gas limit from reservation \ - is currently unimplemented and there is no way to send such dispatch" - ), - } - - self.dispatches.push_back(dispatch.into_stored()); - } else { - let gas_limit = dispatch.gas_limit().unwrap_or_default(); - let stored_message = dispatch.into_stored().into_parts().1; - - if let Ok(mailbox_msg) = stored_message.clone().try_into() { - let origin_node = reservation - .map(|r| r.into_origin().cast()) - .unwrap_or(message_id); - self.gas_tree - .cut(origin_node, stored_message.id(), gas_limit) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - - self.mailbox - .insert(mailbox_msg) - .unwrap_or_else(|e| unreachable!("Mailbox corrupted! {:?}", e)); - } else { - log::debug!("A reply message is sent to user: {stored_message:?}"); - }; - - self.log.push(stored_message); - } - - if let Some(reservation) = reservation { - let has_removed_reservation = self - .remove_reservation(source, reservation) - .expect("failed to find genuine_program"); - if !has_removed_reservation { - unreachable!("Failed to remove reservation {reservation} from {source}"); - } - } - } - - fn wait_dispatch( - &mut self, - dispatch: StoredDispatch, - duration: Option, - _: MessageWaitedType, - ) { - log::debug!("[{}] wait", dispatch.id()); - - let dest = dispatch.destination(); - let id = dispatch.id(); - self.wait_list.insert((dest, id), dispatch); - if let Some(duration) = duration { - self.wait_list_schedules - .entry(self.blocks_manager.get().height + duration) - .or_default() - .push((dest, id)); - } - } - - fn wake_message( - &mut self, - message_id: MessageId, - program_id: ProgramId, - awakening_id: MessageId, - _delay: u32, - ) { - log::debug!("[{message_id}] waked message#{awakening_id}"); - - if let Some(msg) = self.wait_list.remove(&(program_id, awakening_id)) { - self.dispatches.push_back(msg); - } - } - - #[track_caller] - fn update_pages_data( - &mut self, - program_id: ProgramId, - pages_data: BTreeMap, - ) { - self.update_storage_pages(&program_id, pages_data); - } - - #[track_caller] - fn update_allocations(&mut self, program_id: ProgramId, allocations: IntervalsTree) { - self.update_genuine_program(program_id, |program| { - program - .allocations - .difference(&allocations) - .flat_map(IntervalIterator::from) - .flat_map(|page| page.to_iter()) - .for_each(|ref page| { - program.pages_data.remove(page); - }); - program.allocations = allocations; - }) - .expect("no genuine program was found"); - } - - #[track_caller] - fn send_value(&mut self, from: ProgramId, to: Option, value: Balance) { - if value == 0 { - // Nothing to do - return; - } - if let Some(ref to) = to { - if self.is_program(&from) { - let mut actors = self.actors.borrow_mut(); - let (_, balance) = actors.get_mut(&from).expect("Can't fail"); - - if *balance < value { - unreachable!("Actor {:?} balance is less then sent value", from); - } - - *balance -= value; - - if *balance < crate::EXISTENTIAL_DEPOSIT { - *balance = 0; - } - } - - self.mint_to(to, value, MintMode::KeepAlive); - } else { - self.mint_to(&from, value, MintMode::KeepAlive); - } - } - - #[track_caller] - fn store_new_programs( - &mut self, - program_id: ProgramId, - code_id: CodeId, - candidates: Vec<(MessageId, ProgramId)>, - ) { - if let Some(code) = self.opt_binaries.get(&code_id).cloned() { - for (init_message_id, candidate_id) in candidates { - if !self.actors.contains_key(&candidate_id) { - let schedule = Schedule::default(); - let code = Code::try_new( - code.clone(), - schedule.instruction_weights.version, - |module| schedule.rules(module), - schedule.limits.stack_height, - schedule.limits.data_segments_amount.into(), - schedule.limits.table_number.into(), - ) - .expect("Program can't be constructed with provided code"); - - let code_and_id: InstrumentedCodeAndId = - CodeAndId::from_parts_unchecked(code, code_id).into(); - let (code, code_id) = code_and_id.into_parts(); - - self.store_new_actor( - candidate_id, - Program::Genuine(GenuineProgram { - code, - code_id, - allocations: Default::default(), - pages_data: Default::default(), - gas_reservation_map: Default::default(), - }), - Some(init_message_id), - ); - // Transfer the ED from the program-creator to the new program - self.send_value(program_id, Some(candidate_id), crate::EXISTENTIAL_DEPOSIT); - } else { - log::debug!("Program with id {candidate_id:?} already exists"); - } - } - } else { - log::debug!("No referencing code with code hash {code_id:?} for candidate programs"); - for (_, invalid_candidate_id) in candidates { - self.actors - .insert(invalid_candidate_id, (TestActor::Dormant, 0)); - } - } - } - - #[track_caller] - fn stop_processing(&mut self, dispatch: StoredDispatch, gas_burned: u64) { - log::debug!( - "Not enough gas for processing msg id {}, allowance equals {}, gas tried to burn at least {}", - dispatch.id(), - self.gas_allowance, - gas_burned, - ); - - self.messages_processing_enabled = false; - self.dispatches.push_front(dispatch); - } - - fn reserve_gas( - &mut self, - message_id: MessageId, - reservation_id: ReservationId, - _program_id: ProgramId, - amount: u64, - duration: u32, - ) { - log::debug!( - "Reserved: {:?} from {:?} with {:?} for {} blocks", - amount, - message_id, - reservation_id, - duration - ); - - self.gas_tree - .reserve_gas(message_id, reservation_id, amount) - .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); - } - - fn unreserve_gas( - &mut self, - reservation_id: ReservationId, - program_id: ProgramId, - _expiration: u32, - ) { - let has_removed_reservation = self - .remove_reservation(program_id, reservation_id) - .expect("failed to find genuine_program"); - if !has_removed_reservation { - unreachable!("Failed to remove reservation {reservation_id} from {program_id}"); - } - } - - #[track_caller] - fn update_gas_reservation(&mut self, program_id: ProgramId, reserver: GasReserver) { - let block_height = self.blocks_manager.get().height; - self.update_genuine_program(program_id, |program| { - program.gas_reservation_map = - reserver.into_map(block_height, |duration| block_height + duration); - }) - .expect("no genuine program was found"); - } - - fn system_reserve_gas(&mut self, message_id: MessageId, amount: u64) { - self.gas_tree - .system_reserve(message_id, amount) - .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); - } - - fn system_unreserve_gas(&mut self, message_id: MessageId) { - self.gas_tree - .system_unreserve(message_id) - .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); - } - - fn send_signal(&mut self, message_id: MessageId, destination: ProgramId, code: SignalCode) { - let reserved = self - .gas_tree - .system_unreserve(message_id) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - - if reserved != 0 { - log::debug!( - "Send signal issued by {} to {} with {} supply", - message_id, - destination, - reserved - ); - - let trap_signal = SignalMessage::new(message_id, code) - .into_dispatch(message_id, destination) - .into_stored(); - - self.gas_tree - .split_with_value( - trap_signal.is_reply(), - message_id, - trap_signal.id(), - reserved, - ) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - - self.dispatches.push_back(trap_signal); - } else { - log::trace!("Signal wasn't send due to inappropriate supply"); - } - } - - fn reply_deposit(&mut self, message_id: MessageId, future_reply_id: MessageId, amount: u64) { - self.gas_tree - .create_deposit(message_id, future_reply_id, amount) - .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); - } -} diff --git a/gtest/src/manager/journal.rs b/gtest/src/manager/journal.rs new file mode 100644 index 00000000000..85182ac953d --- /dev/null +++ b/gtest/src/manager/journal.rs @@ -0,0 +1,422 @@ +// This file is part of Gear. + +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/// Implementation of the `JournalHandler` trait for the `ExtManager`. +use std::collections::BTreeMap; + +use super::{Balance, ExtManager, Gas, GenuineProgram, MintMode, Program, TestActor}; +use core_processor::common::{DispatchOutcome, JournalHandler}; +use gear_common::{scheduler::ScheduledTask, Origin}; +use gear_core::{ + code::{Code, CodeAndId, InstrumentedCodeAndId}, + ids::{CodeId, MessageId, ProgramId, ReservationId}, + memory::PageBuf, + message::{Dispatch, MessageWaitedType, SignalMessage, StoredDispatch}, + pages::{ + numerated::{iterators::IntervalIterator, tree::IntervalsTree}, + GearPage, WasmPage, + }, + reservation::GasReserver, +}; +use gear_core_errors::SignalCode; +use gear_wasm_instrument::gas_metering::Schedule; + +impl JournalHandler for ExtManager { + fn message_dispatched( + &mut self, + message_id: MessageId, + _source: ProgramId, + outcome: DispatchOutcome, + ) { + match outcome { + DispatchOutcome::MessageTrap { .. } => { + self.failed.insert(message_id); + } + DispatchOutcome::NoExecution => { + self.not_executed.insert(message_id); + } + DispatchOutcome::Success | DispatchOutcome::Exit { .. } => { + self.succeed.insert(message_id); + } + DispatchOutcome::InitFailure { program_id, .. } => { + self.init_failure(program_id); + self.failed.insert(message_id); + } + DispatchOutcome::InitSuccess { program_id, .. } => { + self.init_success(program_id); + self.succeed.insert(message_id); + } + } + } + + fn gas_burned(&mut self, message_id: MessageId, amount: u64) { + self.gas_allowance = self.gas_allowance.saturating_sub(Gas(amount)); + self.gas_tree + .spend(message_id, amount) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + + self.gas_burned + .entry(message_id) + .and_modify(|gas| { + *gas += Gas(amount); + }) + .or_insert(Gas(amount)); + } + + fn exit_dispatch(&mut self, id_exited: ProgramId, value_destination: ProgramId) { + if let Some((_, balance)) = self.actors.remove(&id_exited) { + self.mint_to(&value_destination, balance, MintMode::AllowDeath); + } + } + + fn message_consumed(&mut self, message_id: MessageId) { + self.gas_tree + .consume(message_id) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + } + + fn send_dispatch( + &mut self, + message_id: MessageId, + dispatch: Dispatch, + delay: u32, + reservation: Option, + ) { + if delay > 0 { + log::debug!("[{message_id}] new delayed dispatch#{}", dispatch.id()); + + self.send_delayed_dispatch(dispatch, delay); + return; + } + + log::debug!("[{message_id}] new dispatch#{}", dispatch.id()); + + let source = dispatch.source(); + + if self.is_program(&dispatch.destination()) { + match (dispatch.gas_limit(), reservation) { + (Some(gas_limit), None) => self + .gas_tree + .split_with_value(false, message_id, dispatch.id(), gas_limit) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)), + (None, None) => self + .gas_tree + .split(false, message_id, dispatch.id()) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)), + (None, Some(reservation)) => { + self.gas_tree + .split(false, reservation, dispatch.id()) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + } + (Some(_), Some(_)) => unreachable!( + "Sending dispatch with gas limit from reservation \ + is currently unimplemented and there is no way to send such dispatch" + ), + } + + self.dispatches.push_back(dispatch.into_stored()); + } else { + let gas_limit = dispatch.gas_limit().unwrap_or_default(); + let stored_message = dispatch.into_stored().into_parts().1; + + if let Ok(mailbox_msg) = stored_message.clone().try_into() { + let origin_node = reservation + .map(|r| r.into_origin().cast()) + .unwrap_or(message_id); + self.gas_tree + .cut(origin_node, stored_message.id(), gas_limit) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + + self.mailbox + .insert(mailbox_msg) + .unwrap_or_else(|e| unreachable!("Mailbox corrupted! {:?}", e)); + } else { + log::debug!("A reply message is sent to user: {stored_message:?}"); + }; + + self.log.push(stored_message); + } + + if let Some(reservation) = reservation { + let has_removed_reservation = self + .remove_reservation(source, reservation) + .expect("failed to find genuine_program"); + if !has_removed_reservation { + unreachable!("Failed to remove reservation {reservation} from {source}"); + } + } + } + + fn wait_dispatch( + &mut self, + dispatch: StoredDispatch, + duration: Option, + _: MessageWaitedType, + ) { + log::debug!("[{}] wait", dispatch.id()); + + let dest = dispatch.destination(); + let id = dispatch.id(); + let expected_wake = duration.map(|d| { + let expected_bn = d + self.blocks_manager.get().height; + self.task_pool + .add(expected_bn, ScheduledTask::WakeMessage(dest, id)) + .unwrap_or_else(|e| unreachable!("TaskPool corrupted: {e:?}")); + + expected_bn + }); + self.wait_list.insert((dest, id), (dispatch, expected_wake)); + } + + fn wake_message( + &mut self, + message_id: MessageId, + program_id: ProgramId, + awakening_id: MessageId, + _delay: u32, + ) { + log::debug!("[{message_id}] waked message#{awakening_id}"); + + if let Some((msg, expected_bn)) = self.wait_list.remove(&(program_id, awakening_id)) { + self.dispatches.push_back(msg); + + let Some(expected_bn) = expected_bn else { + return; + }; + self.task_pool + .delete( + expected_bn, + ScheduledTask::WakeMessage(program_id, awakening_id), + ) + .unwrap_or_else(|e| unreachable!("TaskPool corrupted: {e:?}")); + } + } + + #[track_caller] + fn update_pages_data( + &mut self, + program_id: ProgramId, + pages_data: BTreeMap, + ) { + self.update_storage_pages(&program_id, pages_data); + } + + #[track_caller] + fn update_allocations(&mut self, program_id: ProgramId, allocations: IntervalsTree) { + self.update_genuine_program(program_id, |program| { + program + .allocations + .difference(&allocations) + .flat_map(IntervalIterator::from) + .flat_map(|page| page.to_iter()) + .for_each(|ref page| { + program.pages_data.remove(page); + }); + program.allocations = allocations; + }) + .expect("no genuine program was found"); + } + + #[track_caller] + fn send_value(&mut self, from: ProgramId, to: Option, value: Balance) { + if value == 0 { + // Nothing to do + return; + } + if let Some(ref to) = to { + if self.is_program(&from) { + let mut actors = self.actors.borrow_mut(); + let (_, balance) = actors.get_mut(&from).expect("Can't fail"); + + if *balance < value { + unreachable!("Actor {:?} balance is less then sent value", from); + } + + *balance -= value; + + if *balance < crate::EXISTENTIAL_DEPOSIT { + *balance = 0; + } + } + + self.mint_to(to, value, MintMode::KeepAlive); + } else { + self.mint_to(&from, value, MintMode::KeepAlive); + } + } + + #[track_caller] + fn store_new_programs( + &mut self, + program_id: ProgramId, + code_id: CodeId, + candidates: Vec<(MessageId, ProgramId)>, + ) { + if let Some(code) = self.opt_binaries.get(&code_id).cloned() { + for (init_message_id, candidate_id) in candidates { + if !self.actors.contains_key(&candidate_id) { + let schedule = Schedule::default(); + let code = Code::try_new( + code.clone(), + schedule.instruction_weights.version, + |module| schedule.rules(module), + schedule.limits.stack_height, + schedule.limits.data_segments_amount.into(), + schedule.limits.table_number.into(), + ) + .expect("Program can't be constructed with provided code"); + + let code_and_id: InstrumentedCodeAndId = + CodeAndId::from_parts_unchecked(code, code_id).into(); + let (code, code_id) = code_and_id.into_parts(); + + self.store_new_actor( + candidate_id, + Program::Genuine(GenuineProgram { + code, + code_id, + allocations: Default::default(), + pages_data: Default::default(), + gas_reservation_map: Default::default(), + }), + Some(init_message_id), + ); + // Transfer the ED from the program-creator to the new program + self.send_value(program_id, Some(candidate_id), crate::EXISTENTIAL_DEPOSIT); + } else { + log::debug!("Program with id {candidate_id:?} already exists"); + } + } + } else { + log::debug!("No referencing code with code hash {code_id:?} for candidate programs"); + for (_, invalid_candidate_id) in candidates { + self.actors + .insert(invalid_candidate_id, (TestActor::Dormant, 0)); + } + } + } + + #[track_caller] + fn stop_processing(&mut self, dispatch: StoredDispatch, gas_burned: u64) { + log::debug!( + "Not enough gas for processing msg id {}, allowance equals {}, gas tried to burn at least {}", + dispatch.id(), + self.gas_allowance, + gas_burned, + ); + + self.messages_processing_enabled = false; + self.dispatches.push_front(dispatch); + } + + fn reserve_gas( + &mut self, + message_id: MessageId, + reservation_id: ReservationId, + _program_id: ProgramId, + amount: u64, + duration: u32, + ) { + log::debug!( + "Reserved: {:?} from {:?} with {:?} for {} blocks", + amount, + message_id, + reservation_id, + duration + ); + + self.gas_tree + .reserve_gas(message_id, reservation_id, amount) + .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); + } + + fn unreserve_gas( + &mut self, + reservation_id: ReservationId, + program_id: ProgramId, + _expiration: u32, + ) { + let has_removed_reservation = self + .remove_reservation(program_id, reservation_id) + .expect("failed to find genuine_program"); + if !has_removed_reservation { + unreachable!("Failed to remove reservation {reservation_id} from {program_id}"); + } + } + + #[track_caller] + fn update_gas_reservation(&mut self, program_id: ProgramId, reserver: GasReserver) { + let block_height = self.blocks_manager.get().height; + self.update_genuine_program(program_id, |program| { + program.gas_reservation_map = + reserver.into_map(block_height, |duration| block_height + duration); + }) + .expect("no genuine program was found"); + } + + fn system_reserve_gas(&mut self, message_id: MessageId, amount: u64) { + self.gas_tree + .system_reserve(message_id, amount) + .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); + } + + fn system_unreserve_gas(&mut self, message_id: MessageId) { + self.gas_tree + .system_unreserve(message_id) + .unwrap_or_else(|e| unreachable!("GasTree corrupted: {:?}", e)); + } + + fn send_signal(&mut self, message_id: MessageId, destination: ProgramId, code: SignalCode) { + let reserved = self + .gas_tree + .system_unreserve(message_id) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + + if reserved != 0 { + log::debug!( + "Send signal issued by {} to {} with {} supply", + message_id, + destination, + reserved + ); + + let trap_signal = SignalMessage::new(message_id, code) + .into_dispatch(message_id, destination) + .into_stored(); + + self.gas_tree + .split_with_value( + trap_signal.is_reply(), + message_id, + trap_signal.id(), + reserved, + ) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + + self.dispatches.push_back(trap_signal); + } else { + log::trace!("Signal wasn't send due to inappropriate supply"); + } + } + + fn reply_deposit(&mut self, message_id: MessageId, future_reply_id: MessageId, amount: u64) { + self.gas_tree + .create_deposit(message_id, future_reply_id, amount) + .unwrap_or_else(|e| unreachable!("GasTree corrupted! {:?}", e)); + } +} diff --git a/gtest/src/manager/task.rs b/gtest/src/manager/task.rs new file mode 100644 index 00000000000..8dbc9416706 --- /dev/null +++ b/gtest/src/manager/task.rs @@ -0,0 +1,110 @@ +// This file is part of Gear. + +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/// Implementation of the `TaskHandler` trait for the `ExtManager`. +use super::ExtManager; +use gear_common::{scheduler::TaskHandler, Gas as GearCommonGas}; +use gear_core::ids::{CodeId, MessageId, ProgramId, ReservationId}; + +impl TaskHandler for ExtManager { + fn pause_program(&mut self, _program_id: ProgramId) -> GearCommonGas { + log::debug!("Program rent logic is disabled."); + + 0 + } + + fn remove_code(&mut self, _code_id: CodeId) -> GearCommonGas { + todo!("#646") + } + + fn remove_from_mailbox( + &mut self, + _user_id: ProgramId, + _message_id: MessageId, + ) -> GearCommonGas { + todo!() + } + + fn remove_from_waitlist( + &mut self, + _program_id: ProgramId, + _message_id: MessageId, + ) -> GearCommonGas { + todo!() + } + + fn remove_paused_program(&mut self, _program_id: ProgramId) -> GearCommonGas { + todo!("#646") + } + + fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> GearCommonGas { + let (dispatch, _) = self + .wait_list + .remove(&(program_id, message_id)) + .unwrap_or_else(|| unreachable!("TaskPool corrupted!")); + self.dispatches.push_back(dispatch); + + GearCommonGas::MIN + } + + fn send_dispatch(&mut self, stashed_message_id: MessageId) -> GearCommonGas { + let dispatch = self + .dispatches_stash + .remove(&stashed_message_id) + .unwrap_or_else(|| unreachable!("TaskPool corrupted!")); + + self.dispatches.push_back(dispatch.into_stored()); + + GearCommonGas::MIN + } + + fn send_user_message( + &mut self, + stashed_message_id: MessageId, + _to_mailbox: bool, + ) -> GearCommonGas { + let dispatch = self + .dispatches_stash + .remove(&stashed_message_id) + .unwrap_or_else(|| unreachable!("TaskPool corrupted!")); + let stored_message = dispatch.into_parts().1.into_stored(); + let mailbox_message = stored_message.clone().try_into().unwrap_or_else(|e| { + unreachable!("invalid message: can't be converted to user message {e:?}") + }); + + self.mailbox + .insert(mailbox_message) + .unwrap_or_else(|e| unreachable!("Mailbox corrupted! {:?}", e)); + self.log.push(stored_message); + + GearCommonGas::MIN + } + + fn remove_gas_reservation( + &mut self, + _program_id: ProgramId, + _reservation_id: ReservationId, + ) -> GearCommonGas { + todo!() + } + + fn remove_resume_session(&mut self, _session_id: u32) -> GearCommonGas { + log::debug!("Program rent logic is disabled"); + 0 + } +} diff --git a/gtest/src/system.rs b/gtest/src/system.rs index a23b262f0f6..39abc27cd63 100644 --- a/gtest/src/system.rs +++ b/gtest/src/system.rs @@ -396,6 +396,7 @@ impl Drop for System { SYSTEM_INITIALIZED.with_borrow_mut(|initialized| *initialized = false); self.0.borrow().gas_tree.reset(); self.0.borrow().mailbox.reset(); + self.0.borrow().task_pool.clear(); } } diff --git a/gtest/src/taskpool.rs b/gtest/src/task_pool.rs similarity index 66% rename from gtest/src/taskpool.rs rename to gtest/src/task_pool.rs index d51cd263a10..f61cb6cd6d7 100644 --- a/gtest/src/taskpool.rs +++ b/gtest/src/task_pool.rs @@ -1,6 +1,26 @@ +// This file is part of Gear. + +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Auxiliary (for tests) task pool implementation for the crate. + use gear_common::{ auxiliary::{ - taskpool::{AuxiliaryTaskpool, TaskPoolErrorImpl, TaskPoolStorageWrap}, + task_pool::{AuxiliaryTaskpool, TaskPoolErrorImpl, TaskPoolStorageWrap}, BlockNumber, }, scheduler::{ScheduledTask, TaskPool, TaskPoolCallbacks}, @@ -8,16 +28,16 @@ use gear_common::{ ProgramId, }; -/// Task pool manager for gtest environment. +/// Task pool manager which operates under the hood over +/// [`gear_common::AuxiliaryTaskpool`]. /// -/// TODO(ap): wait for #4119 and work on integrating this into it, until then -/// allow(dead_code). +/// Manager is needed mainly to adapt arguments of the task pool methods to the +/// crate. #[derive(Debug, Default)] -#[allow(dead_code)] pub(crate) struct TaskPoolManager; -#[allow(dead_code)] impl TaskPoolManager { + /// Adapted by argument types version of the task pool `add` method. pub(crate) fn add( &self, block_number: BlockNumber, @@ -26,10 +46,13 @@ impl TaskPoolManager { as TaskPool>::add(block_number, task) } + /// Adapted by argument types version of the task pool `clear` method. pub(crate) fn clear(&self) { as TaskPool>::clear(); } + /// Adapted by argument types version of the task pool `contains` method. + #[allow(unused)] pub(crate) fn contains( &self, block_number: &BlockNumber, @@ -38,6 +61,7 @@ impl TaskPoolManager { as TaskPool>::contains(block_number, task) } + /// Adapted by argument types version of the task pool `delete` method. pub(crate) fn delete( &self, block_number: BlockNumber, @@ -46,7 +70,9 @@ impl TaskPoolManager { as TaskPool>::delete(block_number, task) } - pub(crate) fn drain( + /// Adapted by argument types version of the task pool `drain_prefix_keys` + /// method. + pub(crate) fn drain_prefix_keys( &self, block_number: BlockNumber, ) -> ::DrainIter { @@ -54,6 +80,7 @@ impl TaskPoolManager { } } +/// Task pool callbacks implementor. pub(crate) struct TaskPoolCallbacksImpl; impl TaskPoolCallbacks for TaskPoolCallbacksImpl { @@ -99,14 +126,14 @@ mod tests { assert!(manager.contains(&2, task)); } - for task in manager.drain(1) { + for task in manager.drain_prefix_keys(1) { assert!( block_1_tasks.contains(&task), "task not found in block 1 tasks" ); } - for task in manager.drain(2) { + for task in manager.drain_prefix_keys(2) { assert!( block_2_tasks.contains(&task), "task not found in block 2 tasks"