Skip to content

Commit

Permalink
Consolidate activation requests as they occur
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Feb 15, 2023
1 parent 157b80d commit f652e39
Showing 1 changed file with 127 additions and 45 deletions.
172 changes: 127 additions & 45 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ impl Scheduler for Box<dyn Scheduler> {
/// Allocation-free activation tracker.
#[derive(Debug)]
pub struct Activations {
clean: usize,
/// `(offset, length)`
bounds: Vec<(usize, usize)>,
slices: Vec<usize>,
buffer: Vec<usize>,
/// Current activations that are being served.
current: ActivationsBuffer,
/// Upcoming activations that may soon be served.
pending: ActivationsBuffer,

// Inter-thread activations.
tx: Sender<Vec<usize>>,
Expand All @@ -60,10 +59,8 @@ impl Activations {
pub fn new(timer: Instant) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
Self {
clean: 0,
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
current: ActivationsBuffer::new(),
pending: ActivationsBuffer::new(),
tx,
rx,
timer,
Expand All @@ -73,8 +70,7 @@ impl Activations {

/// Activates the task addressed by `path`.
pub fn activate(&mut self, path: &[usize]) {
self.bounds.push((self.slices.len(), path.len()));
self.slices.extend(path);
self.pending.activate(path);
}

/// Schedules a future activation for the task addressed by `path`.
Expand All @@ -89,6 +85,20 @@ impl Activations {
}
}

/// Reactivates the task addressed by `path`, ideally within `delay`.
///
/// The task may be activated before `delay`, in which case the task should reactivate itself if
/// it requires further reactivation, as this specific `delay` may no longer be in effect.
pub fn activate_within(&mut self, path: &[usize], delay: Duration) {
if delay == Duration::new(0, 0) {
self.activate(path)
}
else {
let moment = self.timer.elapsed() + delay;
self.pending.activate_by(path, moment)
}
}

/// Discards the current active set and presents the next active set.
pub fn advance(&mut self) {

Expand All @@ -104,37 +114,135 @@ impl Activations {
self.activate(&path[..]);
}

self.bounds.drain(.. self.clean);
self.current.clear();
self.pending.compact();
self.pending.extract_through(&mut self.current, now);

}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
self.current.map_active(logic)
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], action: impl FnMut(usize)) {
self.current.for_extensions(path, action)
}

/// Constructs a thread-safe `SyncActivations` handle to this activator.
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}

/// Time until next scheduled event.
///
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.current.is_empty() || !self.pending.is_empty() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}
}

/// Manages delayed activations for path-named tasks.
#[derive(Debug)]
pub struct ActivationsBuffer {
/// `(offset, length)`, and an elapsed timer duration.
/// The zero duration can be used to indicate "immediately".
bounds: Vec<(usize, usize, Duration)>,
/// Elements of path slices.
slices: Vec<usize>,
/// A spare buffer to copy into.
buffer: Vec<usize>,
}

impl ActivationsBuffer {
/// Creates a new activation tracker.
pub fn new() -> Self {
Self {
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
}
}

fn clear(&mut self) {
self.bounds.clear();
self.slices.clear();
self.buffer.clear();
}

fn is_empty(&self) -> bool {
self.bounds.is_empty()
}

/// Activates the task addressed by `path`.
pub fn activate(&mut self, path: &[usize]) {
self.activate_by(path, Duration::new(0, 0));
}

/// Activates the task addressed by `path`.
pub fn activate_by(&mut self, path: &[usize], duration: Duration) {
self.bounds.push((self.slices.len(), path.len(), duration));
self.slices.extend(path);
}

/// Orders activations by their path, and retains only each's most immediate duration.
pub fn compact(&mut self) {

{ // Scoped, to allow borrow to drop.
let slices = &self.slices[..];
self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
// Sort slices by path, and within each path by duration.
self.bounds.sort_by_key(|x| (&slices[x.0 .. (x.0 + x.1)], x.2));
// Deduplicate by path, retaining the least duration.
self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
}

// Compact the slices.
self.buffer.clear();
for (offset, length) in self.bounds.iter_mut() {
for (offset, length, _duration) in self.bounds.iter_mut() {
self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
*offset = self.buffer.len() - *length;
}
::std::mem::swap(&mut self.buffer, &mut self.slices);
}

self.clean = self.bounds.len();
/// Extracts all activations less or equal to `threshold` into `other`.
pub fn extract_through(&mut self, other: &mut Self, threshold: Duration) {
for (offset, length, duration) in self.bounds.iter_mut() {
if *duration <= threshold {
other.activate_by(&self.slices[*offset .. (*offset + *length)], *duration);
}
}
self.bounds.retain(|(_off, _len, duration)| *duration > threshold);
// Could `self.compact()` here, but it will happen as part of future work.
}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
for (offset, length) in self.bounds.iter() {
for (offset, length, _duration) in self.bounds.iter() {
logic(&self.slices[*offset .. (*offset + *length)]);
}
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {

let position =
self.bounds[..self.clean]
self.bounds
.binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
let position = match position {
Ok(x) => x,
Expand All @@ -146,7 +254,7 @@ impl Activations {
.iter()
.cloned()
.skip(position)
.map(|(offset, length)| &self.slices[offset .. (offset + length)])
.map(|(offset, length, _)| &self.slices[offset .. (offset + length)])
.take_while(|x| x.starts_with(path))
.for_each(|x| {
// push non-empty, non-duplicate extensions.
Expand All @@ -158,32 +266,6 @@ impl Activations {
}
});
}

/// Constructs a thread-safe `SyncActivations` handle to this activator.
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}

/// Time until next scheduled event.
///
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}
}

/// A thread-safe handle to an `Activations`.
Expand Down

0 comments on commit f652e39

Please sign in to comment.