From 9f0571f018c792a9ae05777b49f5b1dfd8b49505 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Wed, 28 Apr 2021 23:39:12 -0700 Subject: [PATCH] Lazy-allocate pusher memory. The current implementation of channel pushers preallocates `Message::default_length` entries per pusher. For very large graphs this adds up to a lot of memory being allocated even when the system is idle with no outstanding messages. This patch changes the allocation policy to only allocate channel memory when there are messages to send and to deallocate it at the end of a burst of messages (signaled by pushing a `None` message), reducing the memory footprint to 0 in idle state at the cost of some potential slow-down due to a larger number of allocations. See #394. --- timely/src/dataflow/channels/mod.rs | 5 ----- timely/src/dataflow/channels/pushers/buffer.rs | 6 +++++- timely/src/dataflow/channels/pushers/exchange.rs | 10 +++++++++- timely/src/dataflow/channels/pushers/tee.rs | 4 +++- timely/src/dataflow/operators/input.rs | 13 ++++++++++--- 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 641f0a190b..acb3dd9de6 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -52,9 +52,4 @@ impl Message { buffer.clear(); } } - - // TODO: Unclear we always want this here. - if buffer.capacity() != Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); - } }} diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 288d5ff884..603d071649 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -23,7 +23,7 @@ impl>> Buffer where T: Eq+Clone { pub fn new(pusher: P) -> Buffer { Buffer { time: None, - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Vec::new(), pusher, } } @@ -53,6 +53,7 @@ impl>> Buffer where T: Eq+Clone { pub fn cease(&mut self) { self.flush(); self.pusher.push(&mut None); + self.buffer = Vec::new(); } /// moves the contents of @@ -65,6 +66,9 @@ impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. fn give(&mut self, data: D) { + if self.buffer.capacity() == 0 { + self.buffer = Vec::with_capacity(Message::::default_length()); + } self.buffer.push(data); // assert!(self.buffer.capacity() == Message::::default_length()); if self.buffer.len() == self.buffer.capacity() { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 5b37a84386..08dc45b3d7 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -18,7 +18,7 @@ impl>, H: FnMut(&T, &D)->u64> Exchange, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(Vec::with_capacity(Message::::default_length())); + buffers.push(Vec::new()); } Exchange { pushers, @@ -64,6 +64,9 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length()); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -82,6 +85,9 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length()); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -95,6 +101,8 @@ impl>, H: FnMut(&T, &D)->u64> Push Push> for Tee { for index in 1..pushers.len() { pushers[index-1].push(&mut None); } + // Free up buffer space. + self.buffer = Vec::new(); } if pushers.len() > 0 { let last = pushers.len() - 1; @@ -44,7 +46,7 @@ impl Tee { pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); let port = Tee { - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Vec::new(), shared: shared.clone(), }; diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 11d02896d0..e59a52c28e 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -212,8 +212,8 @@ impl Handle { activate: Vec::new(), progress: Vec::new(), pushers: Vec::new(), - buffer1: Vec::with_capacity(Message::::default_length()), - buffer2: Vec::with_capacity(Message::::default_length()), + buffer1: Vec::new(), + buffer2: Vec::new(), now_at: T::minimum(), } } @@ -288,7 +288,11 @@ impl Handle { // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } + if !self.buffer1.is_empty() { + self.flush(); + self.buffer1 = Vec::new(); + self.buffer2 = Vec::new(); + } for pusher in self.pushers.iter_mut() { pusher.done(); } @@ -305,6 +309,9 @@ impl Handle { /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. pub fn send(&mut self, data: D) { // assert!(self.buffer1.capacity() == Message::::default_length()); + if self.buffer1.capacity() == 0 { + self.buffer1 = Vec::with_capacity(Message::::default_length()); + } self.buffer1.push(data); if self.buffer1.len() == self.buffer1.capacity() { self.flush();