diff --git a/all-is-cubes/src/listen/util.rs b/all-is-cubes/src/listen/util.rs index 1bf9e2281..5034593e1 100644 --- a/all-is-cubes/src/listen/util.rs +++ b/all-is-cubes/src/listen/util.rs @@ -1,4 +1,5 @@ use alloc::sync::{Arc, Weak}; +use alloc::vec::Vec; use core::fmt; use manyfmt::formats::Unquote; @@ -35,21 +36,40 @@ where T: Listener, { fn receive(&self, messages: &[MI]) -> bool { - if messages.is_empty() { - return self.target.receive(&[]); - } + if const { size_of::() == 0 } { + // If the size of the output message is zero, then we can buffer an arbitrary number + // of them without occupying any memory or performing any allocation, and therefore + // preserve the input batching for free. + let mut filtered_messages = Vec::::new(); + for message in messages { + if let Some(filtered_message) = (self.function)(message) { + filtered_messages.push(filtered_message); + } + } + // Deliver entire batch of ZST messages. + self.target.receive(filtered_messages.as_slice()) + } else { + if messages.is_empty() { + // Ensure that we still check liveness if the batch is empty. + return self.target.receive(&[]); + } - for message in messages { - if let Some(filtered_message) = (self.function)(message) { - // TODO: figure out some kind of stack array batching so we don't do a separate - // receive() for each message. - let alive = self.target.receive(&[filtered_message]); - if !alive { - return false; + for message in messages { + if let Some(filtered_message) = (self.function)(message) { + // TODO: figure out some kind of stack array batching so we don't do a separate + // receive() for each message. + let alive = self.target.receive(&[filtered_message]); + if !alive { + return false; + } } } + + // If we got here without returning false, then either we delivered at least one message + // and know that self.target is alive, or all the messages were filtered out. + // (Hmm, should we check liveness anyway in the latter case?) + true } - true } } @@ -141,8 +161,20 @@ mod tests { use crate::listen::{Listen as _, Sink}; use alloc::vec::Vec; + /// Breaks the listener rules for testing by recording batch boundaries. + #[derive(Debug)] + struct CaptureBatch(L); + impl Listener for CaptureBatch + where + L: Listener>, + { + fn receive(&self, messages: &[M]) -> bool { + self.0.receive(&[Vec::from(messages)]) + } + } + #[test] - fn filter() { + fn filter_filtering_and_drop() { let notifier: Notifier> = Notifier::new(); let sink = Sink::new(); notifier.listen(sink.listener().filter(|&x| x)); @@ -158,6 +190,44 @@ mod tests { assert_eq!(notifier.count(), 0); } + /// Currently, `Filter` breaks up all batches into single element batches + /// (unless the type is a ZST). + #[test] + fn filter_batching_nzst() { + let notifier: Notifier = Notifier::new(); + let sink: Sink> = Sink::new(); + notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x))); + + // Send some batches + notifier.notify_many(&[0, 1]); + notifier.notify_many(&[]); + notifier.notify_many(&[2, 3]); + + // Currently, the batches are all of size 1. + assert_eq!( + sink.drain(), + vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3],] + ); + } + + /// Currently, `Filter` breaks up all batches. This is not ideal. + #[test] + fn filter_batching_zst() { + let notifier: Notifier = Notifier::new(); + let sink: Sink> = Sink::new(); + notifier.listen( + CaptureBatch(sink.listener()).filter(|&x: &i32| if x == 2 { None } else { Some(()) }), + ); + + // Send some batches + notifier.notify_many(&[0, 1]); + notifier.notify_many(&[]); + notifier.notify_many(&[2, 3]); + + // Expect batches to be preserved and filtered. + assert_eq!(sink.drain(), vec![vec![], vec![(), ()], vec![], vec![()]]); + } + #[test] fn gate() { let notifier: Notifier = Notifier::new();