From f652e395fbbe128d2b00f79b0e582bf0e956def0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 15 Feb 2023 10:54:11 -0500 Subject: [PATCH] Consolidate activation requests as they occur --- timely/src/scheduling/activate.rs | 172 ++++++++++++++++++++++-------- 1 file changed, 127 insertions(+), 45 deletions(-) diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index e91c87a99..dd3617030 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -39,11 +39,10 @@ impl Scheduler for Box { /// Allocation-free activation tracker. #[derive(Debug)] pub struct Activations { - clean: usize, - /// `(offset, length)` - bounds: Vec<(usize, usize)>, - slices: Vec, - buffer: Vec, + /// Current activations that are being served. + current: ActivationsBuffer, + /// Upcoming activations that may soon be served. + pending: ActivationsBuffer, // Inter-thread activations. tx: Sender>, @@ -60,10 +59,8 @@ impl Activations { pub fn new(timer: Instant) -> Self { let (tx, rx) = crossbeam_channel::unbounded(); Self { - clean: 0, - bounds: Vec::new(), - slices: Vec::new(), - buffer: Vec::new(), + current: ActivationsBuffer::new(), + pending: ActivationsBuffer::new(), tx, rx, timer, @@ -73,8 +70,7 @@ impl Activations { /// Activates the task addressed by `path`. pub fn activate(&mut self, path: &[usize]) { - self.bounds.push((self.slices.len(), path.len())); - self.slices.extend(path); + self.pending.activate(path); } /// Schedules a future activation for the task addressed by `path`. @@ -89,6 +85,20 @@ impl Activations { } } + /// Reactivates the task addressed by `path`, ideally within `delay`. + /// + /// The task may be activated before `delay`, in which case the task should reactivate itself if + /// it requires further reactivation, as this specific `delay` may no longer be in effect. + pub fn activate_within(&mut self, path: &[usize], delay: Duration) { + if delay == Duration::new(0, 0) { + self.activate(path) + } + else { + let moment = self.timer.elapsed() + delay; + self.pending.activate_by(path, moment) + } + } + /// Discards the current active set and presents the next active set. pub fn advance(&mut self) { @@ -104,37 +114,135 @@ impl Activations { self.activate(&path[..]); } - self.bounds.drain(.. self.clean); + self.current.clear(); + self.pending.compact(); + self.pending.extract_through(&mut self.current, now); + + } + + /// Maps a function across activated paths. + pub fn map_active(&self, logic: impl Fn(&[usize])) { + self.current.map_active(logic) + } + + /// Sets as active any symbols that follow `path`. + pub fn for_extensions(&self, path: &[usize], action: impl FnMut(usize)) { + self.current.for_extensions(path, action) + } + + /// Constructs a thread-safe `SyncActivations` handle to this activator. + pub fn sync(&self) -> SyncActivations { + SyncActivations { + tx: self.tx.clone(), + thread: std::thread::current(), + } + } + + /// Time until next scheduled event. + /// + /// This method should be used before putting a worker thread to sleep, as it + /// indicates the amount of time before the thread should be unparked for the + /// next scheduled activation. + pub fn empty_for(&self) -> Option { + if !self.current.is_empty() || !self.pending.is_empty() { + Some(Duration::new(0,0)) + } + else { + self.queue.peek().map(|Reverse((t,_a))| { + let elapsed = self.timer.elapsed(); + if t < &elapsed { Duration::new(0,0) } + else { *t - elapsed } + }) + } + } +} + +/// Manages delayed activations for path-named tasks. +#[derive(Debug)] +pub struct ActivationsBuffer { + /// `(offset, length)`, and an elapsed timer duration. + /// The zero duration can be used to indicate "immediately". + bounds: Vec<(usize, usize, Duration)>, + /// Elements of path slices. + slices: Vec, + /// A spare buffer to copy into. + buffer: Vec, +} + +impl ActivationsBuffer { + /// Creates a new activation tracker. + pub fn new() -> Self { + Self { + bounds: Vec::new(), + slices: Vec::new(), + buffer: Vec::new(), + } + } + + fn clear(&mut self) { + self.bounds.clear(); + self.slices.clear(); + self.buffer.clear(); + } + + fn is_empty(&self) -> bool { + self.bounds.is_empty() + } + + /// Activates the task addressed by `path`. + pub fn activate(&mut self, path: &[usize]) { + self.activate_by(path, Duration::new(0, 0)); + } + + /// Activates the task addressed by `path`. + pub fn activate_by(&mut self, path: &[usize], duration: Duration) { + self.bounds.push((self.slices.len(), path.len(), duration)); + self.slices.extend(path); + } + + /// Orders activations by their path, and retains only each's most immediate duration. + pub fn compact(&mut self) { { // Scoped, to allow borrow to drop. let slices = &self.slices[..]; - self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]); + // Sort slices by path, and within each path by duration. + self.bounds.sort_by_key(|x| (&slices[x.0 .. (x.0 + x.1)], x.2)); + // Deduplicate by path, retaining the least duration. self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]); } // Compact the slices. self.buffer.clear(); - for (offset, length) in self.bounds.iter_mut() { + for (offset, length, _duration) in self.bounds.iter_mut() { self.buffer.extend(&self.slices[*offset .. (*offset + *length)]); *offset = self.buffer.len() - *length; } ::std::mem::swap(&mut self.buffer, &mut self.slices); + } - self.clean = self.bounds.len(); + /// Extracts all activations less or equal to `threshold` into `other`. + pub fn extract_through(&mut self, other: &mut Self, threshold: Duration) { + for (offset, length, duration) in self.bounds.iter_mut() { + if *duration <= threshold { + other.activate_by(&self.slices[*offset .. (*offset + *length)], *duration); + } + } + self.bounds.retain(|(_off, _len, duration)| *duration > threshold); + // Could `self.compact()` here, but it will happen as part of future work. } /// Maps a function across activated paths. pub fn map_active(&self, logic: impl Fn(&[usize])) { - for (offset, length) in self.bounds.iter() { + for (offset, length, _duration) in self.bounds.iter() { logic(&self.slices[*offset .. (*offset + *length)]); } } - + /// Sets as active any symbols that follow `path`. pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) { let position = - self.bounds[..self.clean] + self.bounds .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]); let position = match position { Ok(x) => x, @@ -146,7 +254,7 @@ impl Activations { .iter() .cloned() .skip(position) - .map(|(offset, length)| &self.slices[offset .. (offset + length)]) + .map(|(offset, length, _)| &self.slices[offset .. (offset + length)]) .take_while(|x| x.starts_with(path)) .for_each(|x| { // push non-empty, non-duplicate extensions. @@ -158,32 +266,6 @@ impl Activations { } }); } - - /// Constructs a thread-safe `SyncActivations` handle to this activator. - pub fn sync(&self) -> SyncActivations { - SyncActivations { - tx: self.tx.clone(), - thread: std::thread::current(), - } - } - - /// Time until next scheduled event. - /// - /// This method should be used before putting a worker thread to sleep, as it - /// indicates the amount of time before the thread should be unparked for the - /// next scheduled activation. - pub fn empty_for(&self) -> Option { - if !self.bounds.is_empty() { - Some(Duration::new(0,0)) - } - else { - self.queue.peek().map(|Reverse((t,_a))| { - let elapsed = self.timer.elapsed(); - if t < &elapsed { Duration::new(0,0) } - else { *t - elapsed } - }) - } - } } /// A thread-safe handle to an `Activations`.