diff --git a/dfir_rs/src/scheduled/context.rs b/dfir_rs/src/scheduled/context.rs index bc469a50ff9..a771c28880c 100644 --- a/dfir_rs/src/scheduled/context.rs +++ b/dfir_rs/src/scheduled/context.rs @@ -3,14 +3,13 @@ //! Provides APIs for state and scheduling. use std::any::Any; -use std::cell::RefCell; +use std::cell::Cell; use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; use std::ops::DerefMut; use std::pin::Pin; -use smallvec::SmallVec; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use web_time::SystemTime; @@ -42,8 +41,8 @@ pub struct Context { /// Second field (bool) is for if the event is an external "important" event (true). pub(super) event_queue_send: UnboundedSender<(SubgraphId, bool)>, - /// Subgraphs rescheduled in the current stratum. - pub(super) rescheduled_subgraphs: RefCell>, + /// If the current subgraph wants to reschedule in the current tick+stratum. + pub(super) reschedule_current_subgraph: Cell, pub(super) current_tick: TickInstant, pub(super) current_stratum: usize, @@ -100,9 +99,7 @@ impl Context { /// Schedules the current subgraph to run again _this tick_. pub fn reschedule_current_subgraph(&self) { - self.rescheduled_subgraphs - .borrow_mut() - .push(self.subgraph_id); + self.reschedule_current_subgraph.set(true); } /// Returns a `Waker` for interacting with async Rust. @@ -246,7 +243,7 @@ impl Default for Context { events_received_tick: false, event_queue_send, - rescheduled_subgraphs: Default::default(), + reschedule_current_subgraph: Cell::new(false), current_stratum: 0, current_tick: TickInstant::default(), diff --git a/dfir_rs/src/scheduled/graph.rs b/dfir_rs/src/scheduled/graph.rs index eb0e760b77b..ca7ac472578 100644 --- a/dfir_rs/src/scheduled/graph.rs +++ b/dfir_rs/src/scheduled/graph.rs @@ -282,6 +282,7 @@ impl<'a> Dfir<'a> { } let sg_data = &self.subgraphs[sg_id.0]; + debug_assert_eq!(self.context.current_stratum, sg_data.stratum); for &handoff_id in sg_data.succs.iter() { let handoff = &self.handoffs[handoff_id.0]; if !handoff.handoff.is_bottom() { @@ -299,9 +300,9 @@ impl<'a> Dfir<'a> { } } - for sg_id in self.context.rescheduled_subgraphs.borrow_mut().drain(..) { - let sg_data = &self.subgraphs[sg_id.0]; - assert_eq!(sg_data.stratum, self.context.current_stratum); + // Check if subgraph wants rescheduling + if self.context.reschedule_current_subgraph.take() { + // Add subgraph to stratum queue if it is not already scheduled. if !sg_data.is_scheduled.replace(true) { self.context.stratum_queues[sg_data.stratum].push_back(sg_id); }