From e04a83122aa636b20d15a3fd9a66c19df2faa890 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 Oct 2021 13:15:26 +0200 Subject: [PATCH] Promote non-preallocating exchange to default I did some measurements comparing the * current default, * a non-preallocating exchange * id. plus eagerly releasing exchange. The benchmark in the chain of commits does not report a significant change in performance for all variants. Erring on the side of caution, I'd propse to make the second variant, i.e. not pre-allocating, but retaining if push returned a buffer, the default exchange pact. Currently, all non-zerocopy allocators will not return an allocation on push, so in this case, the second and third variant are equivalent. # Why is there no change in performance? For single-recrod exchanges, this can be reasoned as follows. In the old approach, we'd have an allocated buffer, where we write a single element. On flush, the buffer would be sent to the pusher, which might or might not return a buffer. If it doesn't, we'd allocate a new one. In the new approach, we don't have a buffer to append to, so we'd first allocate one. On flush, we send the buffer downstream and maybe get one back. We don't eagerly allocate a new one if there is no buffer available. Both approaches allocate a new buffer if the pusher doesn't/didn't return one earlier. Only the order of allocations is changed. While in the old approach we send data and allocate, we allocate and send data in the new approach. On top of this, the new approach needs to pay a cost of checking the buffer's capacity for each record on the input, but I'd hope for the optimizer to sort this out. The same argument seems to hold for larger batch sizes. Signed-off-by: Moritz Hoffmann --- experiments/benches/exchange_bench.rs | 43 +---------- timely/src/dataflow/channels/pact.rs | 7 +- .../src/dataflow/channels/pushers/exchange.rs | 73 ++++-------------- timely/src/dataflow/channels/pushers/mod.rs | 1 - .../pushers/non_retaining_exchange.rs | 75 ------------------- timely/src/dataflow/operators/exchange.rs | 24 +----- 6 files changed, 23 insertions(+), 200 deletions(-) delete mode 100644 timely/src/dataflow/channels/pushers/non_retaining_exchange.rs diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 491c36585b..981878ee07 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -6,9 +6,6 @@ use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::{ - Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract, -}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; use timely::{CommunicationConfig, Config, WorkerConfig}; @@ -43,22 +40,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - || ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetaining", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_exchange( - config, - params.batch, - iters, - || NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -76,25 +57,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - ||ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetainingZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_exchange( - config, - params.batch, - iters, - ||NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -103,15 +65,14 @@ fn bench(c: &mut Criterion) { } } -fn experiment_exchange P + Sync + Send + 'static, P: ParallelizationContract>( +fn experiment_exchange( config: Config, batch: u64, rounds: u64, - pact: F, ) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe()); + let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe()); let mut time = 0; let timer = Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 000f72b48e..43b5e42792 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -9,8 +9,7 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; -use crate::ExchangeData; -use crate::communication::{Push, Pull}; +use crate::communication::{Push, Pull, Data}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::worker::AsWorker; @@ -19,8 +18,6 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -pub use super::pushers::non_retaining_exchange::NonRetainingExchange; - /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. @@ -61,7 +58,7 @@ implu64+'static> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContract for Exchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index bc4aa4fa8d..fad8340c60 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,89 +1,43 @@ //! The exchange pattern distributes pushed data between many target pushees. -use std::marker::PhantomData; - use crate::Data; use crate::communication::Push; use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct ExchangePusherGeneric>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior> { +pub struct Exchange>, H: FnMut(&T, &D) -> u64> { pushers: Vec

, buffers: Vec>, current: Option, hash_func: H, - _phantom_data: PhantomData, -} - -/// The behavior of an exchange specialization -/// -/// This trait gives specialized exchange implementations the opportunity to hook into interesting -/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting -/// from creation, ensuring allocations, flushing and finalizing. -pub trait ExchangeBehavior { - /// Allocate a new buffer, called while creating the exchange pusher. - fn allocate() -> Vec; - /// Check the buffer's capacity before pushing a single element. - fn check(buffer: &mut Vec); - /// Flush a buffer's contents, either when the buffer is at capacity or when no more data is - /// available. - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P); - /// Finalize a buffer after pushing `None`, i.e. no more data is available. - fn finalize(buffer: &mut Vec); -} - -/// Exchange behavior that always has push buffers fully allocated. -pub struct FullyAllocatedExchangeBehavior {} - -impl ExchangeBehavior for FullyAllocatedExchangeBehavior { - fn allocate() -> Vec { - Vec::with_capacity(Message::::default_length()) - } - - fn check(_buffer: &mut Vec) { - // Not needed, always allocated - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - // `push_at` ensures an allocation. - Message::push_at(buffer, time, pusher); - } - - fn finalize(_buffer: &mut Vec) { - // retain any allocation - } } -/// Default exchange type is to fully allocate exchange buffers -pub type Exchange = ExchangePusherGeneric; - -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { - /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Exchange { + /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(B::allocate()); + buffers.push(Vec::new()); } - ExchangePusherGeneric { + Exchange { pushers, hash_func: key, buffers, current: None, - _phantom_data: PhantomData, } } #[inline] fn flush(&mut self, index: usize) { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); } } } } -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> Push> for ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { #[inline(never)] fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange @@ -109,7 +63,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha let mask = (self.pushers.len() - 1) as u64; for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) & mask) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -128,7 +85,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha else { for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -142,7 +102,6 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha for index in 0..self.pushers.len() { self.flush(index); self.pushers[index].push(&mut None); - B::finalize(&mut self.buffers[index]); } } } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 496a13b672..295d033cab 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,7 +3,6 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; -pub mod non_retaining_exchange; pub mod exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs deleted file mode 100644 index b53e12f065..0000000000 --- a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! The exchange pattern distributes pushed data between many target pushees. - -use std::marker::PhantomData; - -use crate::ExchangeData; -use crate::communication::{Pull, Push}; -use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; -use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; -use crate::dataflow::channels::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; - -/// Distributes records among target pushees according to a distribution function. -/// -/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but -/// leaves no allocations around. It does not preallocate a buffer for each pushee, but -/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and -/// any returned allocation will be dropped. -pub struct NonRetainingExchangeBehavior {} - -impl ExchangeBehavior for NonRetainingExchangeBehavior { - fn allocate() -> Vec { - Vec::new() - } - - fn check(buffer: &mut Vec) { - if buffer.capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - buffer.capacity(); - buffer.reserve(to_reserve); - } - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Message::push_at_no_allocation(buffer, time, pusher); - } - - fn finalize(buffer: &mut Vec) { - *buffer = Vec::new(); - } -} - -/// Non-retaining exchange pusher definition -pub type NonRetainingExchangePusher = ExchangePusherGeneric; - -/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher]. -pub struct NonRetainingExchange { hash_func: F, phantom: PhantomData } - -implu64+'static> NonRetainingExchange { - /// Allocates a new exchange pact from a distribution function. - pub fn new(func: F) -> Self { - Self { - hash_func: func, - phantom: PhantomData, - } - } -} - -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for NonRetainingExchange { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) - } -} - -impl std::fmt::Debug for NonRetainingExchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NonRetainingExchange").finish() - } -} diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 1f6438b67e..c106ce37f1 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -1,7 +1,7 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::dataflow::channels::pact::{Exchange as ExchangePact, ParallelizationContract}; +use crate::dataflow::channels::pact::Exchange as ExchangePact; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; @@ -23,31 +23,13 @@ pub trait Exchange { /// }); /// ``` fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Self; - - /// Apply a parallelization contract on the data in a stream - /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; - /// use timely::dataflow::channels::pact::NonRetainingExchange; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .apply_pact(NonRetainingExchange::new(|x| *x)) - /// .inspect(|x| println!("seen: {:?}", x)); - /// }); - /// ``` - fn apply_pact>(&self, pact: P) -> Self where T: 'static; } +// impl, D: ExchangeData> Exchange for Stream { impl Exchange for Stream { fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream { - self.apply_pact(ExchangePact::new(route)) - } - - fn apply_pact>(&self, pact: P) -> Stream { let mut vector = Vec::new(); - self.unary(pact, "Exchange", move |_,_| move |input, output| { + self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); output.session(&time).give_vec(&mut vector);