Skip to content

Commit

Permalink
listen: Batch messages passing through a Filter if they are ZSTs.
Browse files Browse the repository at this point in the history
Ideally, we would always batch messages, but doing that with an
appropriate size of stack-allocated buffer would naïvely require the
caller to specify a size, which is a change with a larger impact. In
the ZST case we can just buffer up to `usize::MAX` messages for free.

(The `const {}` block probably doesn't do anything, but I put it there
to make it blatantly obvious to readers that the branch is statically
determined.)
  • Loading branch information
kpreid committed Sep 26, 2024
1 parent 8920473 commit cbb9586
Showing 1 changed file with 82 additions and 12 deletions.
94 changes: 82 additions & 12 deletions all-is-cubes/src/listen/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::sync::{Arc, Weak};
use alloc::vec::Vec;
use core::fmt;

use manyfmt::formats::Unquote;
Expand Down Expand Up @@ -35,21 +36,40 @@ where
T: Listener<MO>,
{
fn receive(&self, messages: &[MI]) -> bool {
if messages.is_empty() {
return self.target.receive(&[]);
}
if const { size_of::<MO>() == 0 } {
// If the size of the output message is zero, then we can buffer an arbitrary number
// of them without occupying any memory or performing any allocation, and therefore
// preserve the input batching for free.
let mut filtered_messages = Vec::<MO>::new();
for message in messages {
if let Some(filtered_message) = (self.function)(message) {
filtered_messages.push(filtered_message);
}
}
// Deliver entire batch of ZST messages.
self.target.receive(filtered_messages.as_slice())
} else {
if messages.is_empty() {
// Ensure that we still check liveness if the batch is empty.
return self.target.receive(&[]);
}

for message in messages {
if let Some(filtered_message) = (self.function)(message) {
// TODO: figure out some kind of stack array batching so we don't do a separate
// receive() for each message.
let alive = self.target.receive(&[filtered_message]);
if !alive {
return false;
for message in messages {
if let Some(filtered_message) = (self.function)(message) {
// TODO: figure out some kind of stack array batching so we don't do a separate
// receive() for each message.
let alive = self.target.receive(&[filtered_message]);
if !alive {
return false;
}
}
}

// If we got here without returning false, then either we delivered at least one message
// and know that self.target is alive, or all the messages were filtered out.
// (Hmm, should we check liveness anyway in the latter case?)
true
}
true
}
}

Expand Down Expand Up @@ -141,8 +161,20 @@ mod tests {
use crate::listen::{Listen as _, Sink};
use alloc::vec::Vec;

/// Breaks the listener rules for testing by recording batch boundaries.
#[derive(Debug)]
struct CaptureBatch<L>(L);
impl<M: Clone, L> Listener<M> for CaptureBatch<L>
where
L: Listener<Vec<M>>,
{
fn receive(&self, messages: &[M]) -> bool {
self.0.receive(&[Vec::from(messages)])
}
}

#[test]
fn filter() {
fn filter_filtering_and_drop() {
let notifier: Notifier<Option<i32>> = Notifier::new();
let sink = Sink::new();
notifier.listen(sink.listener().filter(|&x| x));
Expand All @@ -158,6 +190,44 @@ mod tests {
assert_eq!(notifier.count(), 0);
}

/// Currently, `Filter` breaks up all batches into single element batches
/// (unless the type is a ZST).
#[test]
fn filter_batching_nzst() {
let notifier: Notifier<i32> = Notifier::new();
let sink: Sink<Vec<i32>> = Sink::new();
notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x)));

// Send some batches
notifier.notify_many(&[0, 1]);
notifier.notify_many(&[]);
notifier.notify_many(&[2, 3]);

// Currently, the batches are all of size 1.
assert_eq!(
sink.drain(),
vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3],]
);
}

/// Currently, `Filter` breaks up all batches. This is not ideal.
#[test]
fn filter_batching_zst() {
let notifier: Notifier<i32> = Notifier::new();
let sink: Sink<Vec<()>> = Sink::new();
notifier.listen(
CaptureBatch(sink.listener()).filter(|&x: &i32| if x == 2 { None } else { Some(()) }),
);

// Send some batches
notifier.notify_many(&[0, 1]);
notifier.notify_many(&[]);
notifier.notify_many(&[2, 3]);

// Expect batches to be preserved and filtered.
assert_eq!(sink.drain(), vec![vec![], vec![(), ()], vec![], vec![()]]);
}

#[test]
fn gate() {
let notifier: Notifier<i32> = Notifier::new();
Expand Down

0 comments on commit cbb9586

Please sign in to comment.