Skip to content

Commit

Permalink
Fully implement waitlist management
Browse files Browse the repository at this point in the history
  • Loading branch information
techraed committed Aug 27, 2024
1 parent 93e684b commit 5f856eb
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 77 deletions.
1 change: 1 addition & 0 deletions common/src/auxiliary/waitlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl AuxiliaryDoubleStorageWrap for WaitlistStorageWrap {
}

/// An implementor of the error returned from calling `Waitlist` trait functions
#[derive(Debug)]
pub enum WaitlistErrorImpl {
DuplicateKey,
ElementNotFound,
Expand Down
35 changes: 25 additions & 10 deletions gtest/src/gas_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,31 +183,46 @@ impl GasTreeManager {
/// Unreserve some value from underlying balance.
///
/// Used in gas reservation for system signal.
pub(crate) fn system_unreserve(&self, key: MessageId) -> Result<Gas, GasTreeError> {
GasTree::system_unreserve(GasNodeId::from(key.cast::<PlainNodeId>()))
pub(crate) fn system_unreserve(&self, message_id: MessageId) -> Result<Gas, GasTreeError> {
GasTree::system_unreserve(GasNodeId::from(message_id.cast::<PlainNodeId>()))
}

/// Reserve some value from underlying balance.
///
/// Used in gas reservation for system signal.
pub(crate) fn system_reserve(&self, key: MessageId, amount: Gas) -> Result<(), GasTreeError> {
GasTree::system_reserve(GasNodeId::from(key.cast::<PlainNodeId>()), amount)
pub(crate) fn system_reserve(
&self,
message_id: MessageId,
amount: Gas,
) -> Result<(), GasTreeError> {
GasTree::system_reserve(GasNodeId::from(message_id.cast::<PlainNodeId>()), amount)
}

pub fn lock(&self, key: MessageId, id: LockId, amount: Gas) -> Result<(), GasTreeError> {
GasTree::lock(GasNodeId::from(key.cast::<PlainNodeId>()), id, amount)
pub fn lock(&self, message_id: MessageId, id: LockId, amount: Gas) -> Result<(), GasTreeError> {
GasTree::lock(
GasNodeId::from(message_id.cast::<PlainNodeId>()),
id,
amount,
)
}

pub(crate) fn unlock_all(&self, key: impl Origin, id: LockId) -> Result<Gas, GasTreeError> {
GasTree::unlock_all(GasNodeId::from(key.cast::<PlainNodeId>()), id)
pub(crate) fn unlock_all(
&self,
message_id: impl Origin,
id: LockId,
) -> Result<Gas, GasTreeError> {
GasTree::unlock_all(GasNodeId::from(message_id.cast::<PlainNodeId>()), id)
}

/// The id of node, external origin and funds multiplier for a key.
///
/// Error occurs if the tree is invalidated (has "orphan" nodes), and the
/// node identified by the `key` belongs to a subtree originating at
/// such "orphan" node, or in case of inexistent key.
pub(crate) fn get_origin_node(&self, key: MessageId) -> Result<OriginNodeDataOf, GasTreeError> {
GasTree::get_origin_node(GasNodeId::from(key.cast::<PlainNodeId>()))
pub(crate) fn get_origin_node(
&self,
message_id: MessageId,
) -> Result<OriginNodeDataOf, GasTreeError> {
GasTree::get_origin_node(GasNodeId::from(message_id.cast::<PlainNodeId>()))
}
}
174 changes: 145 additions & 29 deletions gtest/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
mailbox::MailboxManager,
program::{Gas, WasmProgram},
task_pool::TaskPoolManager,
waitlist::WaitlistManager,
Result, TestError, DISPATCH_HOLD_COST, EPOCH_DURATION_IN_BLOCKS, EXISTENTIAL_DEPOSIT,
GAS_ALLOWANCE, GAS_MULTIPLIER, HOST_FUNC_READ_COST, HOST_FUNC_WRITE_AFTER_READ_COST,
HOST_FUNC_WRITE_COST, INITIAL_RANDOM_SEED, LOAD_ALLOCATIONS_PER_INTERVAL,
Expand All @@ -51,7 +52,11 @@ use core_processor::{
ContextChargedForCode, ContextChargedForInstrumentation, Ext,
};
use gear_common::{
auxiliary::{gas_provider::PlainNodeId, mailbox::MailboxErrorImpl, BlockNumber},
auxiliary::{
gas_provider::PlainNodeId, mailbox::MailboxErrorImpl, waitlist::WaitlistErrorImpl,
BlockNumber,
},
event::{MessageWaitedReason, MessageWaitedRuntimeReason},
scheduler::{ScheduledTask, StorageType},
storage::Interval,
LockId, Origin,
Expand Down Expand Up @@ -98,7 +103,7 @@ pub(crate) struct ExtManager {
pub(crate) dispatches: VecDeque<StoredDispatch>,
pub(crate) mailbox: MailboxManager,
pub(crate) task_pool: TaskPoolManager,
pub(crate) wait_list: BTreeMap<(ProgramId, MessageId), (StoredDispatch, Option<BlockNumber>)>,
pub(crate) waitlist: WaitlistManager,
pub(crate) gas_tree: GasTreeManager,
pub(crate) gas_allowance: Gas,
pub(crate) dispatches_stash: HashMap<MessageId, (StoredDelayedDispatch, Interval<BlockNumber>)>,
Expand Down Expand Up @@ -444,20 +449,15 @@ impl ExtManager {
})
.unwrap_or_default();

let message_id = message.id();
let from = message.source();
let to = message.destination();
let value = message.value();

let stored_message = message.into_stored();
let message: UserMessage = stored_message.clone().try_into().unwrap_or_else(|_| {
let err_msg = format!(
"send_user_message: failed conversion from stored into user message. \
Message id - {message_id}, program id - {from}, destination - {to}",
);

unreachable!("{err_msg}")
});
let message: UserMessage = stored_message
.clone()
.try_into()
.expect("failed to convert stored message to user message");

if Accounts::balance(from) != 0 {
self.bank.deposit_value(from, value, false);
Expand Down Expand Up @@ -501,15 +501,10 @@ impl ExtManager {
});

let message_id = message.id();
let message: UserStoredMessage = message.clone().try_into().unwrap_or_else(|_| {
// Replies never sent to mailbox
let err_msg = format!(
"send_user_message: failed conversion from user into user stored message. \
Message id - {message_id}, program id - {from:?}, destination - {to:?}",
);

unreachable!("{err_msg}")
});
let message: UserStoredMessage = message
.clone()
.try_into()
.expect("failed to convert user message to user stored message");

self.mailbox
.insert(message, hold.expected())
Expand Down Expand Up @@ -613,15 +608,7 @@ impl ExtManager {
let message: UserStoredMessage = message
.clone()
.try_into()
.unwrap_or_else(|_| {
// Replies never sent to mailbox
let err_msg = format!(
"send_user_message_after_delay: failed conversion from user into user stored message. \
Message id - {message_id}, program id - {from:?}, destination - {to:?}",
);

unreachable!("{err_msg}")
});
.expect("failed to convert user message to user stored message");
self.mailbox
.insert(message, hold.expected())
.unwrap_or_else(|e| {
Expand Down Expand Up @@ -1317,4 +1304,133 @@ impl ExtManager {

self.bank.spend_gas(external.cast(), amount, multiplier)
}

// todo [sab] separate this stuff

fn wait_dipatch_impl(
&self,
dispatch: StoredDispatch,
duration: Option<BlockNumber>,
reason: MessageWaitedReason,
) {
use MessageWaitedRuntimeReason::*;

let hold_builder = HoldBoundBuilder::new(StorageType::Waitlist);

let maximal_hold = hold_builder.maximum_for_message(self, dispatch.id());

let hold = if let Some(duration) = duration {
hold_builder.duration(self, duration).min(maximal_hold)
} else {
maximal_hold
};

let message_id = dispatch.id();
let destination = dispatch.destination();

if hold.expected_duration(self).is_zero() {
let gas_limit = self.gas_tree.get_limit(dispatch.id()).unwrap_or_else(|e| {
let err_msg = format!(
"wait_dispatch: failed getting message gas limit. Message id - {message_id}. \
Got error - {e:?}",
message_id = dispatch.id()
);

unreachable!("{err_msg}");
});

let err_msg = format!(
"wait_dispatch: message got zero duration hold bound for waitlist. \
Requested duration - {duration:?}, gas limit - {gas_limit}, \
wait reason - {reason:?}, message id - {}.",
dispatch.id(),
);

unreachable!("{err_msg}");
}

// Locking funds for holding.
let lock_id = hold.lock_id().unwrap_or_else(|| {
// Waitlist storage is guaranteed to have an associated lock id
let err_msg = "wait_dispatch: No associated lock id for the waitlist storage";

unreachable!("{err_msg}");
});
self.gas_tree
.lock(message_id, lock_id, hold.lock_amount(self))
.unwrap_or_else(|e| {
let err_msg = format!(
"wait_dispatch: failed locking gas for the waitlist hold. \
Message id - {message_id}, lock amount - {lock}. Got error - {e:?}",
lock = hold.lock_amount(self)
);

unreachable!("{err_msg}");
});

match reason {
MessageWaitedReason::Runtime(WaitForCalled | WaitUpToCalledFull) => {
let expected = hold.expected();
let task = ScheduledTask::WakeMessage(destination, message_id);

if !self.task_pool.contains(&expected, &task) {
self.task_pool.add(expected, task).unwrap_or_else(|e| {
let err_msg = format!(
"wait_dispatch: failed adding task for waking message. \
Expected bn - {expected:?}, program id - {destination}, message id - {message_id}. Got error - {e:?}",
);

log::error!("{err_msg}");
unreachable!("{err_msg}");
});
}
}
MessageWaitedReason::Runtime(WaitCalled | WaitUpToCalled) => {
self.task_pool.add(
hold.expected(),
ScheduledTask::RemoveFromWaitlist(dispatch.destination(), dispatch.id()),
)
.unwrap_or_else(|e| {
let err_msg = format!(
"wait_dispatch: failed adding task for removing message from waitlist. \
Expected bn - {bn:?}, program id - {destination}, message id - {message_id}. Got error - {e:?}",
bn = hold.expected(),
);

log::error!("{err_msg}");
unreachable!("{err_msg}");
});
}
MessageWaitedReason::System(reason) => match reason {},
}

self.waitlist.insert(dispatch, hold.expected())
.unwrap_or_else(|e| {
let err_msg = format!(
"wait_dispatch: failed inserting message to the wailist. \
Expected bn - {bn:?}, program id - {destination}, message id - {message_id}. Got error - {e:?}",
bn = hold.expected(),
);

unreachable!("{err_msg}");
});
}

fn wake_dispatch_impl(
&mut self,
program_id: ProgramId,
message_id: MessageId,
) -> Result<StoredDispatch, WaitlistErrorImpl> {
let (waitlisted, hold_interval) = self.waitlist.remove(program_id, message_id)?;
let expected_bn = hold_interval.finish;

self.charge_for_hold(waitlisted.id(), hold_interval, StorageType::Waitlist);

let _ = self.task_pool.delete(
expected_bn,
ScheduledTask::RemoveFromWaitlist(waitlisted.destination(), waitlisted.id()),
);

Ok(waitlisted)
}
}
21 changes: 17 additions & 4 deletions gtest/src/manager/hold_bound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
//! Implementation of HoldBound and HoldBound builder, specifcying cost of
//! holding data.
use gear_common::{auxiliary::BlockNumber, scheduler::StorageType, LockId, MessageId};

use crate::RESERVE_FOR;
use std::cmp::Ordering;

use super::ExtManager;
use crate::RESERVE_FOR;
use gear_common::{auxiliary::BlockNumber, scheduler::StorageType, LockId, MessageId};

/// Hold bound, specifying cost of storage, expected block number for task to
/// create on it, deadlines and durations of holding.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HoldBound {
cost: u64,
expected: BlockNumber,
Expand Down Expand Up @@ -65,6 +65,19 @@ impl HoldBound {
}
}

impl PartialOrd for HoldBound {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for HoldBound {
fn cmp(&self, other: &Self) -> Ordering {
self.expected.cmp(&other.expected)
}
}

#[derive(Debug, Clone, Copy)]
pub struct HoldBoundBuilder {
storage_type: StorageType,
cost: u64,
Expand Down
Loading

0 comments on commit 5f856eb

Please sign in to comment.