diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index 59efb720ca7..9baff8a640d 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -32,7 +32,10 @@ impl Processor { pub fn run_schedule(&mut self, in_block_transitions: &mut InBlockTransitions) { let tasks = in_block_transitions.take_actual_tasks(); - log::debug!("Running schedule for #{}: tasks are {tasks:?}", in_block_transitions.block_number()); + log::debug!( + "Running schedule for #{}: tasks are {tasks:?}", + in_block_transitions.block_number() + ); let mut handler = ScheduleHandler { in_block_transitions, diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 5dc26036fd3..cd4e4bad643 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -477,7 +477,14 @@ fn many_waits() { let db = MemDb::default(); let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap(); - let ch0 = init_new_block(&mut processor, BlockHeader { height: 1, timestamp: 1, parent_hash: Default::default() }); + let ch0 = init_new_block( + &mut processor, + BlockHeader { + height: 1, + timestamp: 1, + parent_hash: Default::default(), + }, + ); let code_id = processor .handle_new_code(code) diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 3a0b9db52d7..22d6be199cb 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -243,7 +243,7 @@ impl JournalHandler for Handler<'_, S> { let in_blocks = NonZeroU32::try_from(duration).expect("must be checked on backend side"); - self.in_block_transitions.schedule_task( + let expiry = self.in_block_transitions.schedule_task( in_blocks, ScheduledTask::WakeMessage(dispatch.destination(), dispatch.id()), ); @@ -265,7 +265,7 @@ impl JournalHandler for Handler<'_, S> { // TODO (breathx): impl Copy for MaybeHash? state.waitlist_hash = storage.modify_waitlist(state.waitlist_hash.clone(), |waitlist| { - let r = waitlist.insert(dispatch.id, dispatch); + let r = waitlist.insert(dispatch.id, (dispatch, expiry)); debug_assert!(r.is_none()); })?; @@ -287,7 +287,7 @@ impl JournalHandler for Handler<'_, S> { log::trace!("Dispatch {message_id} tries to wake {awakening_id}"); self.update_state_with_storage(program_id, |storage, state| { - let Some((dispatch, new_waitlist_hash)) = storage + let Some(((dispatch, _expiry), new_waitlist_hash)) = storage .modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| { waitlist.remove(&awakening_id) })? diff --git a/ethexe/runtime/common/src/schedule.rs b/ethexe/runtime/common/src/schedule.rs index a3e775d4646..c6d5da525c8 100644 --- a/ethexe/runtime/common/src/schedule.rs +++ b/ethexe/runtime/common/src/schedule.rs @@ -45,7 +45,7 @@ impl<'a, S: Storage> TaskHandler for Handler<'a, S> { // TODO (breathx): don't update state if not changed? self.update_state_with_storage(program_id, |storage, state| { - let Some((dispatch, new_waitlist_hash)) = storage + let Some(((dispatch, _expiry), new_waitlist_hash)) = storage .modify_waitlist_if_changed(state.waitlist_hash.clone(), |waitlist| { waitlist.remove(&message_id) })? diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index a2a589d0f5d..fb27f7a2241 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -242,9 +242,11 @@ impl Dispatch { } } +pub type ValueWithExpiry = (T, u32); + pub type MessageQueue = VecDeque; -pub type Waitlist = BTreeMap; +pub type Waitlist = BTreeMap>; // TODO (breathx): consider here LocalMailbox for each user. pub type Mailbox = BTreeMap>; diff --git a/ethexe/runtime/common/src/transitions.rs b/ethexe/runtime/common/src/transitions.rs index ae780f0be65..5919ec1680f 100644 --- a/ethexe/runtime/common/src/transitions.rs +++ b/ethexe/runtime/common/src/transitions.rs @@ -74,12 +74,14 @@ impl InBlockTransitions { self.schedule.remove(&self.current_bn).unwrap_or_default() } - pub fn schedule_task(&mut self, in_blocks: NonZeroU32, task: ScheduledTask) { + pub fn schedule_task(&mut self, in_blocks: NonZeroU32, task: ScheduledTask) -> u32 { let scheduled_block = self.current_bn + u32::from(in_blocks); let entry = self.schedule.entry(scheduled_block).or_default(); debug_assert!(!entry.contains(&task)); entry.push(task); + + scheduled_block } pub fn register_new(&mut self, actor_id: ActorId) {