Skip to content

Commit

Permalink
listen: Cleanup in Filter::receive().
Browse files Browse the repository at this point in the history
* Note why an exact-sized batch doesn't do an extra `receive()` at the end.
* Add test of behavior when `BATCH == 1`.
* Remove unnecessary `return`.
  • Loading branch information
kpreid committed Nov 14, 2024
1 parent 6476fab commit 1149dcf
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions all-is-cubes/src/listen/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,27 @@ where
}
}
// Deliver entire batch of ZST messages.
return self.target.receive(filtered_messages.as_slice());
self.target.receive(filtered_messages.as_slice())
} else {
if messages.is_empty() {
return self.target.receive(&[]);
}

let mut buffer: arrayvec::ArrayVec<MO, BATCH> = arrayvec::ArrayVec::new();
for message in messages {
if let Some(filtered_message) = (self.function)(message) {
// Note that we do this fullness check before, not after, pushing a message,
// so if the buffer fills up exactly, we will use the receive() call after
// the end of the loop, not this one.
if buffer.is_full() {
let alive = self.target.receive(&buffer);
if !alive {
// Target doesn’t want any more messages, so we don’t need to filter
// them.
return false;
}
buffer.clear();
}
buffer.push(filtered_message);
}
}
// Deliver final partial batch, if any, and final liveness check.
self.target.receive(&buffer)
}
}
Expand Down Expand Up @@ -212,6 +214,26 @@ mod tests {
assert_eq!(notifier.count(), 0);
}

/// Test the behavior when `with_stack_buffer()` is not called,
/// leaving the buffer size implicitly at 1.
#[test]
fn filter_batch_size_1() {
let notifier: Notifier<i32> = Notifier::new();
let sink: Sink<Vec<i32>> = Sink::new();
notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x)));

// Send some batches
notifier.notify_many(&[0, 1]);
notifier.notify_many(&[]);
notifier.notify_many(&[2, 3]);

// Expect the batches to be of size at most 1
assert_eq!(
sink.drain(),
vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3]]
);
}

#[test]
fn filter_batching_nzst() {
let notifier: Notifier<i32> = Notifier::new();
Expand Down

0 comments on commit 1149dcf

Please sign in to comment.