diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 641f0a190..acb3dd9de 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -52,9 +52,4 @@ impl Message { buffer.clear(); } } - - // TODO: Unclear we always want this here. - if buffer.capacity() != Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); - } }} diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 288d5ff88..603d07164 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -23,7 +23,7 @@ impl>> Buffer where T: Eq+Clone { pub fn new(pusher: P) -> Buffer { Buffer { time: None, - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Vec::new(), pusher, } } @@ -53,6 +53,7 @@ impl>> Buffer where T: Eq+Clone { pub fn cease(&mut self) { self.flush(); self.pusher.push(&mut None); + self.buffer = Vec::new(); } /// moves the contents of @@ -65,6 +66,9 @@ impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. fn give(&mut self, data: D) { + if self.buffer.capacity() == 0 { + self.buffer = Vec::with_capacity(Message::::default_length()); + } self.buffer.push(data); // assert!(self.buffer.capacity() == Message::::default_length()); if self.buffer.len() == self.buffer.capacity() { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 5b37a8438..08dc45b3d 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -18,7 +18,7 @@ impl>, H: FnMut(&T, &D)->u64> Exchange, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(Vec::with_capacity(Message::::default_length())); + buffers.push(Vec::new()); } Exchange { pushers, @@ -64,6 +64,9 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length()); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -82,6 +85,9 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length()); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -95,6 +101,8 @@ impl>, H: FnMut(&T, &D)->u64> Push Push> for Tee { for index in 1..pushers.len() { pushers[index-1].push(&mut None); } + // Free up buffer space. + self.buffer = Vec::new(); } if pushers.len() > 0 { let last = pushers.len() - 1; @@ -44,7 +46,7 @@ impl Tee { pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); let port = Tee { - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Vec::new(), shared: shared.clone(), }; diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 11d02896d..e59a52c28 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -212,8 +212,8 @@ impl Handle { activate: Vec::new(), progress: Vec::new(), pushers: Vec::new(), - buffer1: Vec::with_capacity(Message::::default_length()), - buffer2: Vec::with_capacity(Message::::default_length()), + buffer1: Vec::new(), + buffer2: Vec::new(), now_at: T::minimum(), } } @@ -288,7 +288,11 @@ impl Handle { // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } + if !self.buffer1.is_empty() { + self.flush(); + self.buffer1 = Vec::new(); + self.buffer2 = Vec::new(); + } for pusher in self.pushers.iter_mut() { pusher.done(); } @@ -305,6 +309,9 @@ impl Handle { /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. pub fn send(&mut self, data: D) { // assert!(self.buffer1.capacity() == Message::::default_length()); + if self.buffer1.capacity() == 0 { + self.buffer1 = Vec::with_capacity(Message::::default_length()); + } self.buffer1.push(data); if self.buffer1.len() == self.buffer1.capacity() { self.flush();