Skip to content

Commit

Permalink
Lazy-allocate pusher memory.
Browse files Browse the repository at this point in the history
The current implementation of channel pushers preallocates
`Message::default_length` entries per pusher.  For very large graphs this
adds up to a lot of memory being allocated even when the system is idle
with no outstanding messages.  This patch changes the allocation policy
to only allocate channel memory when there are messages to send and to
deallocate it at the end of a burst of messages (signaled by pushing
a `None` message), reducing the memory footprint to 0 in idle state at
the cost of some potential slow-down due to a larger number of allocations.

See #394.
  • Loading branch information
ryzhyk committed Jun 28, 2021
1 parent 209af99 commit 9f0571f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 11 deletions.
5 changes: 0 additions & 5 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,4 @@ impl<T, D> Message<T, D> {
buffer.clear();
}
}

// TODO: Unclear we always want this here.
if buffer.capacity() != Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
}}
6 changes: 5 additions & 1 deletion timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn new(pusher: P) -> Buffer<T, D, P> {
Buffer {
time: None,
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Vec::new(),
pusher,
}
}
Expand Down Expand Up @@ -53,6 +53,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn cease(&mut self) {
self.flush();
self.pusher.push(&mut None);
self.buffer = Vec::new();
}

/// moves the contents of
Expand All @@ -65,6 +66,9 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {

// internal method for use by `Session`.
fn give(&mut self, data: D) {
if self.buffer.capacity() == 0 {
self.buffer = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer.push(data);
// assert!(self.buffer.capacity() == Message::<O::Data>::default_length());
if self.buffer.len() == self.buffer.capacity() {
Expand Down
10 changes: 9 additions & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D,
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
for _ in 0..pushers.len() {
buffers.push(Vec::with_capacity(Message::<T, D>::default_length()));
buffers.push(Vec::new());
}
Exchange {
pushers,
Expand Down Expand Up @@ -64,6 +64,9 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;

if self.buffers[index].capacity() == 0 {
self.buffers[index] = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -82,6 +85,9 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
if self.buffers[index].capacity() == 0 {
self.buffers[index] = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -95,6 +101,8 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
// Free up buffer space.
self.buffers[index] = Vec::new();
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
for index in 1..pushers.len() {
pushers[index-1].push(&mut None);
}
// Free up buffer space.
self.buffer = Vec::new();
}
if pushers.len() > 0 {
let last = pushers.len() - 1;
Expand All @@ -44,7 +46,7 @@ impl<T, D> Tee<T, D> {
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
let shared = Rc::new(RefCell::new(Vec::new()));
let port = Tee {
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Vec::new(),
shared: shared.clone(),
};

Expand Down
13 changes: 10 additions & 3 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
activate: Vec::new(),
progress: Vec::new(),
pushers: Vec::new(),
buffer1: Vec::with_capacity(Message::<T, D>::default_length()),
buffer2: Vec::with_capacity(Message::<T, D>::default_length()),
buffer1: Vec::new(),
buffer2: Vec::new(),
now_at: T::minimum(),
}
}
Expand Down Expand Up @@ -288,7 +288,11 @@ impl<T:Timestamp, D: Data> Handle<T, D> {

// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
fn close_epoch(&mut self) {
if !self.buffer1.is_empty() { self.flush(); }
if !self.buffer1.is_empty() {
self.flush();
self.buffer1 = Vec::new();
self.buffer2 = Vec::new();
}
for pusher in self.pushers.iter_mut() {
pusher.done();
}
Expand All @@ -305,6 +309,9 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
/// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
pub fn send(&mut self, data: D) {
// assert!(self.buffer1.capacity() == Message::<T, D>::default_length());
if self.buffer1.capacity() == 0 {
self.buffer1 = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer1.push(data);
if self.buffer1.len() == self.buffer1.capacity() {
self.flush();
Expand Down

0 comments on commit 9f0571f

Please sign in to comment.