From 1149dcfbf888ee139f876394fa63dbfdb6fe1cb8 Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Wed, 13 Nov 2024 20:37:42 -0800 Subject: [PATCH] listen: Cleanup in `Filter::receive()`. * Note why an exact-sized batch doesn't do an extra `receive()` at the end. * Add test of behavior when `BATCH == 1`. * Remove unnecessary `return`. --- all-is-cubes/src/listen/util.rs | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/all-is-cubes/src/listen/util.rs b/all-is-cubes/src/listen/util.rs index 2f0065224..9cd6f2401 100644 --- a/all-is-cubes/src/listen/util.rs +++ b/all-is-cubes/src/listen/util.rs @@ -71,18 +71,19 @@ where } } // Deliver entire batch of ZST messages. - return self.target.receive(filtered_messages.as_slice()); + self.target.receive(filtered_messages.as_slice()) } else { - if messages.is_empty() { - return self.target.receive(&[]); - } - let mut buffer: arrayvec::ArrayVec = arrayvec::ArrayVec::new(); for message in messages { if let Some(filtered_message) = (self.function)(message) { + // Note that we do this fullness check before, not after, pushing a message, + // so if the buffer fills up exactly, we will use the receive() call after + // the end of the loop, not this one. if buffer.is_full() { let alive = self.target.receive(&buffer); if !alive { + // Target doesn’t want any more messages, so we don’t need to filter + // them. return false; } buffer.clear(); @@ -90,6 +91,7 @@ where buffer.push(filtered_message); } } + // Deliver final partial batch, if any, and final liveness check. self.target.receive(&buffer) } } @@ -212,6 +214,26 @@ mod tests { assert_eq!(notifier.count(), 0); } + /// Test the behavior when `with_stack_buffer()` is not called, + /// leaving the buffer size implicitly at 1. + #[test] + fn filter_batch_size_1() { + let notifier: Notifier = Notifier::new(); + let sink: Sink> = Sink::new(); + notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x))); + + // Send some batches + notifier.notify_many(&[0, 1]); + notifier.notify_many(&[]); + notifier.notify_many(&[2, 3]); + + // Expect the batches to be of size at most 1 + assert_eq!( + sink.drain(), + vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3]] + ); + } + #[test] fn filter_batching_nzst() { let notifier: Notifier = Notifier::new();