Skip to content

Commit

Permalink
add expiry to waitlist
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Oct 1, 2024
1 parent bf38e9b commit 8ff4f9d
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 8 deletions.
5 changes: 4 additions & 1 deletion ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ impl Processor {
pub fn run_schedule(&mut self, in_block_transitions: &mut InBlockTransitions) {
let tasks = in_block_transitions.take_actual_tasks();

log::debug!("Running schedule for #{}: tasks are {tasks:?}", in_block_transitions.block_number());
log::debug!(
"Running schedule for #{}: tasks are {tasks:?}",
in_block_transitions.block_number()
);

let mut handler = ScheduleHandler {
in_block_transitions,
Expand Down
9 changes: 8 additions & 1 deletion ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,14 @@ fn many_waits() {
let db = MemDb::default();
let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap();

let ch0 = init_new_block(&mut processor, BlockHeader { height: 1, timestamp: 1, parent_hash: Default::default() });
let ch0 = init_new_block(
&mut processor,
BlockHeader {
height: 1,
timestamp: 1,
parent_hash: Default::default(),
},
);

let code_id = processor
.handle_new_code(code)
Expand Down
6 changes: 3 additions & 3 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {

let in_blocks = NonZeroU32::try_from(duration).expect("must be checked on backend side");

self.in_block_transitions.schedule_task(
let expiry = self.in_block_transitions.schedule_task(
in_blocks,
ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()),
);
Expand All @@ -265,7 +265,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
// TODO (breathx): impl Copy for MaybeHash?
state.waitlist_hash =
storage.modify_waitlist(state.waitlist_hash.clone(), |waitlist| {
let r = waitlist.insert(dispatch.id, dispatch);
let r = waitlist.insert(dispatch.id, (dispatch, expiry));
debug_assert!(r.is_none());
})?;

Expand All @@ -287,7 +287,7 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
log::trace!("Dispatch {message_id} tries to wake {awakening_id}");

self.update_state_with_storage(program_id, |storage, state| {
let Some((dispatch, new_waitlist_hash)) = storage
let Some(((dispatch, _expiry), new_waitlist_hash)) = storage
.modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| {
waitlist.remove(&awakening_id)
})?
Expand Down
2 changes: 1 addition & 1 deletion ethexe/runtime/common/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<'a, S: Storage> TaskHandler<ActorId> for Handler<'a, S> {

// TODO (breathx): don't update state if not changed?
self.update_state_with_storage(program_id, |storage, state| {
let Some((dispatch, new_waitlist_hash)) = storage
let Some(((dispatch, _expiry), new_waitlist_hash)) = storage
.modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| {
waitlist.remove(&message_id)
})?
Expand Down
4 changes: 3 additions & 1 deletion ethexe/runtime/common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ impl Dispatch {
}
}

pub type ValueWithExpiry<T> = (T, u32);

pub type MessageQueue = VecDeque<Dispatch>;

pub type Waitlist = BTreeMap<MessageId, Dispatch>;
pub type Waitlist = BTreeMap<MessageId, ValueWithExpiry<Dispatch>>;

// TODO (breathx): consider here LocalMailbox for each user.
pub type Mailbox = BTreeMap<ActorId, BTreeMap<MessageId, Value>>;
Expand Down
4 changes: 3 additions & 1 deletion ethexe/runtime/common/src/transitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ impl InBlockTransitions {
self.schedule.remove(&self.current_bn).unwrap_or_default()
}

pub fn schedule_task(&mut self, in_blocks: NonZeroU32, task: ScheduledTask) {
pub fn schedule_task(&mut self, in_blocks: NonZeroU32, task: ScheduledTask) -> u32 {
let scheduled_block = self.current_bn + u32::from(in_blocks);

let entry = self.schedule.entry(scheduled_block).or_default();
debug_assert!(!entry.contains(&task));
entry.push(task);

scheduled_block
}

pub fn register_new(&mut self, actor_id: ActorId) {
Expand Down

0 comments on commit 8ff4f9d

Please sign in to comment.