Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy-allocate pusher memory. #397

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,4 @@ impl<T, D> Message<T, D> {
buffer.clear();
}
}

// TODO: Unclear we always want this here.
if buffer.capacity() != Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
}}
6 changes: 5 additions & 1 deletion timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn new(pusher: P) -> Buffer<T, D, P> {
Buffer {
time: None,
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Vec::new(),
pusher,
}
}
Expand Down Expand Up @@ -53,6 +53,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn cease(&mut self) {
self.flush();
self.pusher.push(&mut None);
self.buffer = Vec::new();
}

/// moves the contents of
Expand All @@ -65,6 +66,9 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {

// internal method for use by `Session`.
fn give(&mut self, data: D) {
if self.buffer.capacity() == 0 {
self.buffer = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer.push(data);
// assert!(self.buffer.capacity() == Message::<O::Data>::default_length());
if self.buffer.len() == self.buffer.capacity() {
Expand Down
10 changes: 9 additions & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D,
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
for _ in 0..pushers.len() {
buffers.push(Vec::with_capacity(Message::<T, D>::default_length()));
buffers.push(Vec::new());
}
Exchange {
pushers,
Expand Down Expand Up @@ -64,6 +64,9 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;

if self.buffers[index].capacity() == 0 {
self.buffers[index] = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -82,6 +85,9 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
if self.buffers[index].capacity() == 0 {
self.buffers[index] = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -95,6 +101,8 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
// Free up buffer space.
self.buffers[index] = Vec::new();
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
for index in 1..pushers.len() {
pushers[index-1].push(&mut None);
}
// Free up buffer space.
self.buffer = Vec::new();
}
if pushers.len() > 0 {
let last = pushers.len() - 1;
Expand All @@ -44,7 +46,7 @@ impl<T, D> Tee<T, D> {
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
let shared = Rc::new(RefCell::new(Vec::new()));
let port = Tee {
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Vec::new(),
shared: shared.clone(),
};

Expand Down
13 changes: 10 additions & 3 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
activate: Vec::new(),
progress: Vec::new(),
pushers: Vec::new(),
buffer1: Vec::with_capacity(Message::<T, D>::default_length()),
buffer2: Vec::with_capacity(Message::<T, D>::default_length()),
buffer1: Vec::new(),
buffer2: Vec::new(),
now_at: T::minimum(),
}
}
Expand Down Expand Up @@ -288,7 +288,11 @@ impl<T:Timestamp, D: Data> Handle<T, D> {

// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
fn close_epoch(&mut self) {
if !self.buffer1.is_empty() { self.flush(); }
if !self.buffer1.is_empty() {
self.flush();
self.buffer1 = Vec::new();
self.buffer2 = Vec::new();
}
for pusher in self.pushers.iter_mut() {
pusher.done();
}
Expand All @@ -305,6 +309,9 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
/// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
pub fn send(&mut self, data: D) {
// assert!(self.buffer1.capacity() == Message::<T, D>::default_length());
if self.buffer1.capacity() == 0 {
self.buffer1 = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer1.push(data);
if self.buffer1.len() == self.buffer1.capacity() {
self.flush();
Expand Down