Skip to content

Commit

Permalink
adjust waitlist handling
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Sep 29, 2024
1 parent 12f0960 commit 5dd6b10
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 141 deletions.
9 changes: 1 addition & 8 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,7 @@ async fn mailbox() {
let state_hash = mirror.query().state_hash().await.unwrap();

let state = node.db.read_state(state_hash).unwrap();
assert!(!state.mailbox_hash.is_empty()); // could be empty
let mailbox = state
.mailbox_hash
.with_hash_or_default(|hash| node.db.read_mailbox(hash).unwrap());

let expected_mailbox = BTreeMap::from_iter([(env.sender_id, BTreeMap::new())]);

assert_eq!(mailbox, expected_mailbox);
assert!(state.mailbox_hash.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
5 changes: 4 additions & 1 deletion ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum Task {
state_hash: H256,
result_sender: oneshot::Sender<Vec<JournalNote>>,
},
#[allow(unused)] // TODO (breathx)
WakeMessages {
program_id: ProgramId,
state_hash: H256,
Expand Down Expand Up @@ -87,7 +88,8 @@ async fn run_in_async(
handles.push(handle);
}

wake_messages(&task_senders, programs).await;
// TODO (breathx): fix me ASAP.
// wake_messages(&task_senders, programs).await;

loop {
// Send tasks to process programs in workers, until all queues are empty.
Expand Down Expand Up @@ -252,6 +254,7 @@ async fn one_batch(
result_receivers
}

#[allow(unused)] // TODO (breathx)
async fn wake_messages(
task_senders: &[mpsc::Sender<Task>],
programs: &mut BTreeMap<ProgramId, H256>,
Expand Down
103 changes: 30 additions & 73 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,48 +224,35 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
duration: Option<u32>,
waited_type: MessageWaitedType,
) {
let (kind, message, context) = dispatch.into_parts();
let Some(duration) = duration else {
todo!("Wait dispatch without specified duration");
};
let block = self.block_info.height.saturating_add(duration);

let (id, source, destination, payload, value, details) = message.into_parts();

let payload_hash = self.storage.write_payload(payload).into();
// TODO (breathx): support delays.
let block = self.block_info.height.saturating_add(duration);

let dispatch = Dispatch {
id,
kind,
source,
payload_hash,
value,
details,
context,
};
let dispatch = Dispatch::from_stored(self.storage, dispatch);

log::trace!("{:?} was added to waitlist block {block}", dispatch);

// TODO (breathx): FIX ME WITHIN THE PR.
self.update_state_with_storage(destination, |storage, state| {
let (queue_hash, pop_id) = Self::pop_queue_message(state, storage);
self.update_state_with_storage(self.program_id, |storage, state| {
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
let queue_head = queue
.pop_front()
.expect("an attempt to wait message from empty queue");

if pop_id != dispatch.id {
unreachable!(
"First message in queue is {pop_id}, but {} was waited",
dispatch.id
assert_eq!(
queue_head.id, dispatch.id,
"queue head doesn't match processed message"
);
}

let mut waitlist = state.waitlist_hash.clone().with_hash_or_default(|hash| {
storage
.read_waitlist(hash)
.expect("Failed to read waitlist")
});
waitlist.entry(block).or_default().push(dispatch);
})?;

state.waitlist_hash = storage.write_waitlist(waitlist).into();
state.queue_hash = queue_hash.into();
// TODO (breathx): impl Copy for MaybeHash?
state.waitlist_hash =
storage.modify_waitlist(state.waitlist_hash.clone(), |waitlist| {
let r = waitlist.insert(dispatch.id, dispatch);
debug_assert!(r.is_none());
})?;

Ok(())
});
Expand All @@ -278,55 +265,25 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
awakening_id: MessageId,
delay: u32,
) {
log::trace!("Message {message_id} try to wake {awakening_id}");

if delay != 0 {
todo!("Delayed wake message");
}

// TODO (breathx): FIX ME WITHIN THE PR.
log::trace!("Dispatch {message_id} tries to wake {awakening_id}");

self.update_state_with_storage(program_id, |storage, state| {
let mut waitlist = state.waitlist_hash.clone().with_hash_or_default(|hash| {
storage
.read_waitlist(hash)
.expect("Failed to read waitlist")
});

let mut queue = state.queue_hash.clone().with_hash_or_default(|hash| {
storage.read_queue(hash).expect("Failed to read queue")
});

let mut changed = false;
let mut clear_for_block = None;
for (block, list) in waitlist.iter_mut() {
let Some(index) = list
.iter()
.enumerate()
.find_map(|(index, dispatch)| (dispatch.id == awakening_id).then_some(index))
else {
continue;
};

let dispatch = list.remove(index);
log::trace!("{dispatch:?} has been woken up by {message_id}");
let Some((dispatch, new_waitlist_hash)) = storage
.modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| {
waitlist.remove(&awakening_id)
})?
else {
return Ok(());
};

state.waitlist_hash = new_waitlist_hash;
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);

if list.is_empty() {
clear_for_block = Some(*block);
}
changed = true;
break;
}

if let Some(block) = clear_for_block {
waitlist.remove(&block);
}

if changed {
state.queue_hash = storage.write_queue(queue).into();
state.waitlist_hash = storage.write_waitlist(waitlist).into();
}
})?;

Ok(())
});
Expand Down
111 changes: 56 additions & 55 deletions ethexe/runtime/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,62 +70,63 @@ pub trait RuntimeInterface<S: Storage> {
}

pub fn wake_messages<S: Storage, RI: RuntimeInterface<S>>(
program_id: ProgramId,
program_state: ProgramState,
ri: &RI,
_program_id: ProgramId,
_program_state: ProgramState,
_ri: &RI,
) -> Option<H256> {
let block_info = ri.block_info();

let mut queue = program_state.queue_hash.with_hash_or_default(|hash| {
ri.storage()
.read_queue(hash)
.expect("Cannot get message queue")
});

let mut waitlist = match program_state.waitlist_hash {
MaybeHash::Empty => {
// No messages in waitlist
return None;
}
MaybeHash::Hash(HashAndLen { hash, .. }) => ri
.storage()
.read_waitlist(hash)
.expect("Cannot get waitlist"),
};

let mut dispatches_to_wake = Vec::new();
let mut remove_from_waitlist_blocks = Vec::new();
for (block, list) in waitlist.range_mut(0..=block_info.height) {
if list.is_empty() {
log::error!("Empty waitlist for block, must been removed from waitlist")
}
dispatches_to_wake.append(list);
remove_from_waitlist_blocks.push(*block);
}

if remove_from_waitlist_blocks.is_empty() {
// No messages to wake up
return None;
}

for block in remove_from_waitlist_blocks {
waitlist.remove(&block);
}

for dispatch in dispatches_to_wake {
queue.push_back(dispatch);
}

let queue_hash = ri.storage().write_queue(queue).into();
let waitlist_hash = ri.storage().write_waitlist(waitlist).into();

let new_program_state = ProgramState {
queue_hash,
waitlist_hash,
..program_state
};

Some(ri.storage().write_state(new_program_state))
todo!("breathx");
// let block_info = ri.block_info();

// let mut queue = program_state.queue_hash.with_hash_or_default(|hash| {
// ri.storage()
// .read_queue(hash)
// .expect("Cannot get message queue")
// });

// let mut waitlist = match program_state.waitlist_hash {
// MaybeHash::Empty => {
// // No messages in waitlist
// return None;
// }
// MaybeHash::Hash(HashAndLen { hash, .. }) => ri
// .storage()
// .read_waitlist(hash)
// .expect("Cannot get waitlist"),
// };

// let mut dispatches_to_wake = Vec::new();
// let mut remove_from_waitlist_blocks = Vec::new();
// for (block, list) in waitlist.range_mut(0..=block_info.height) {
// if list.is_empty() {
// log::error!("Empty waitlist for block, must been removed from waitlist")
// }
// dispatches_to_wake.append(list);
// remove_from_waitlist_blocks.push(*block);
// }

// if remove_from_waitlist_blocks.is_empty() {
// // No messages to wake up
// return None;
// }

// for block in remove_from_waitlist_blocks {
// waitlist.remove(&block);
// }

// for dispatch in dispatches_to_wake {
// queue.push_back(dispatch);
// }

// let queue_hash = ri.storage().write_queue(queue).into();
// let waitlist_hash = ri.storage().write_waitlist(waitlist).into();

// let new_program_state = ProgramState {
// queue_hash,
// waitlist_hash,
// ..program_state
// };

// Some(ri.storage().write_state(new_program_state))
}

pub fn process_next_message<S: Storage, RI: RuntimeInterface<S>>(
Expand Down
67 changes: 63 additions & 4 deletions ethexe/runtime/common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use gear_core::{
ids::{prelude::MessageIdExt as _, ProgramId},
memory::PageBuf,
message::{
ContextStore, DispatchKind, GasLimit, MessageDetails, Payload, ReplyDetails, Value,
MAX_PAYLOAD_SIZE,
ContextStore, DispatchKind, GasLimit, MessageDetails, Payload, ReplyDetails,
StoredDispatch, Value, MAX_PAYLOAD_SIZE,
},
pages::{numerated::tree::IntervalsTree, GearPage, WasmPage},
program::MemoryInfix,
Expand Down Expand Up @@ -221,12 +221,30 @@ impl Dispatch {
context: None,
}
}

pub fn from_stored<S: Storage>(storage: &S, value: StoredDispatch) -> Self {
let (kind, message, context) = value.into_parts();
let (id, source, destination, payload, value, details) = message.into_parts();

let payload_hash = storage
.store_payload(payload.into_vec())
.expect("infallible due to recasts (only panics on len)");

Self {
id,
kind,
source,
payload_hash,
value,
details,
context,
}
}
}

pub type MessageQueue = VecDeque<Dispatch>;

// TODO (breathx): replace with Map<MId, Dispatch>;
pub type Waitlist = BTreeMap<BlockNumber, Vec<Dispatch>>;
pub type Waitlist = BTreeMap<MessageId, Dispatch>;

// TODO (breathx): consider here LocalMailbox for each user.
pub type Mailbox = BTreeMap<ActorId, BTreeMap<MessageId, Value>>;
Expand Down Expand Up @@ -346,6 +364,47 @@ pub trait ComplexStorage: Storage {
Ok(allocations_hash)
}

/// Usage: for optimized performance, please remove entries if empty.
/// Always updates storage.
fn modify_waitlist(
&self,
waitlist_hash: MaybeHash,
f: impl FnOnce(&mut Waitlist),
) -> Result<MaybeHash> {
self.modify_waitlist_if_changed(waitlist_hash, |waitlist| {
f(waitlist);
Some(())
})
.map(|v| v.expect("`Some` passed above; infallible").1)
}

/// Usage: for optimized performance, please remove entries if empty.
/// Waitlist is treated changed if f() returns Some.
fn modify_waitlist_if_changed<T>(
&self,
waitlist_hash: MaybeHash,
f: impl FnOnce(&mut Waitlist) -> Option<T>,
) -> Result<Option<(T, MaybeHash)>> {
let mut waitlist = waitlist_hash.with_hash_or_default_result(|waitlist_hash| {
self.read_waitlist(waitlist_hash).ok_or_else(|| {
anyhow::anyhow!("failed to read waitlist by its hash ({waitlist_hash})")
})
})?;

let res = if let Some(v) = f(&mut waitlist) {
let maybe_hash = waitlist
.is_empty()
.then_some(MaybeHash::Empty)
.unwrap_or_else(|| self.write_waitlist(waitlist).into());

Some((v, maybe_hash))
} else {
None
};

Ok(res)
}

fn modify_queue(
&self,
queue_hash: MaybeHash,
Expand Down

0 comments on commit 5dd6b10

Please sign in to comment.