Skip to content

Commit

Permalink
exchange: Improve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 20, 2021
1 parent 915778a commit 090e5e0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<T, D> Message<T, D> {

Self::push_at_no_allocation(buffer, time, pusher);

// TODO: Unclear we always want this here.
// Allocate a default buffer to avoid oddly sized or empty buffers
if buffer.capacity() != Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
Expand All @@ -67,7 +67,7 @@ impl<T, D> Message<T, D> {
#[inline]
pub fn push_at_no_allocation<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

let data = ::std::mem::replace(buffer, Vec::new());
let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(Bundle::from_typed(message));

Expand Down
5 changes: 5 additions & 0 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
let mask = (self.pushers.len() - 1) as u64;
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;

// Ensure allocated buffers: If the buffer's capacity is less than its default
// capacity, increase the capacity such that it matches the default.
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
Expand All @@ -85,6 +88,8 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
// Ensure allocated buffers: If the buffer's capacity is less than its default
// capacity, increase the capacity such that it matches the default.
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
Expand Down

0 comments on commit 090e5e0

Please sign in to comment.