diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 08a3ccf3f..491c36585 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -6,7 +6,9 @@ use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::{EagerExchange, LazyExchange}; +use timely::dataflow::channels::pact::{ + Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract, +}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; use timely::{CommunicationConfig, Config, WorkerConfig}; @@ -37,27 +39,27 @@ fn bench(c: &mut Criterion) { move |b, params| { b.iter_custom(|iters| { let config = Config::process(params.threads); - black_box(experiment_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + || ExchangePact::new(|&x| x as u64), + )) }) }, ); group.bench_with_input( - BenchmarkId::new("Lazy", params.clone()), + BenchmarkId::new("NonRetaining", params.clone()), ¶ms, move |b, params| { b.iter_custom(|iters| { let config = Config::process(params.threads); - black_box(experiment_lazy_exchange(config, params.batch, iters)) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("Eager", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_eager_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + || NonRetainingExchange::new(|&x| x as u64), + )) }) }, ); @@ -70,25 +72,17 @@ fn bench(c: &mut Criterion) { communication: CommunicationConfig::ProcessBinary(params.threads), worker: WorkerConfig::default(), }; - black_box(experiment_exchange(config, params.batch, iters)) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("LazyZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_lazy_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + ||ExchangePact::new(|&x| x as u64), + )) }) }, ); group.bench_with_input( - BenchmarkId::new("EagerZero", params.clone()), + BenchmarkId::new("NonRetainingZero", params.clone()), ¶ms, move |b, params| { b.iter_custom(|iters| { @@ -96,7 +90,12 @@ fn bench(c: &mut Criterion) { communication: CommunicationConfig::ProcessBinary(params.threads), worker: WorkerConfig::default(), }; - black_box(experiment_eager_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + ||NonRetainingExchange::new(|&x| x as u64), + )) }) }, ); @@ -104,77 +103,15 @@ fn bench(c: &mut Criterion) { } } -fn experiment_exchange(config: Config, batch: u64, rounds: u64) -> Duration { - timely::execute(config, move |worker| { - let mut input = InputHandle::new(); - let probe = - worker.dataflow(|scope| scope.input_from(&mut input).exchange(|&x| x as u64).probe()); - - let mut time = 0; - let timer = Instant::now(); - - for _round in 0..rounds { - for i in 0..batch { - input.send(i); - } - time += 1; - input.advance_to(time); - while probe.less_than(input.time()) { - worker.step(); - } - } - timer.elapsed() - }) - .unwrap() - .join() - .into_iter() - .next() - .unwrap() - .unwrap() -} - -fn experiment_eager_exchange(config: Config, batch: u64, rounds: u64) -> Duration { - timely::execute(config, move |worker| { - let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| { - scope - .input_from(&mut input) - .apply_pact(EagerExchange::new(|&x| x as u64)) - .probe() - }); - - let mut time = 0; - let timer = Instant::now(); - - for _round in 0..rounds { - for i in 0..batch { - input.send(i); - } - time += 1; - input.advance_to(time); - while probe.less_than(input.time()) { - worker.step(); - } - } - timer.elapsed() - }) - .unwrap() - .join() - .into_iter() - .next() - .unwrap() - .unwrap() -} - -fn experiment_lazy_exchange(config: Config, batch: u64, rounds: u64) -> Duration { +fn experiment_exchange P + Sync + Send + 'static, P: ParallelizationContract>( + config: Config, + batch: u64, + rounds: u64, + pact: F, +) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| { - scope - .input_from(&mut input) - .apply_pact(LazyExchange::new(|&x| x as u64)) - .probe() - }); + let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe()); let mut time = 0; let timer = Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 9ffea6716..000f72b48 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -19,8 +19,7 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -pub use super::pushers::lazy_exchange::LazyExchange; -pub use super::pushers::eager_exchange::EagerExchange; +pub use super::pushers::non_retaining_exchange::NonRetainingExchange; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 8083bc931..bc4aa4fa8 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -33,10 +33,10 @@ pub trait ExchangeBehavior { fn finalize(buffer: &mut Vec); } -/// Default exchange behavior -pub struct DefaultExchangeBehavior {} +/// Exchange behavior that always has push buffers fully allocated. +pub struct FullyAllocatedExchangeBehavior {} -impl ExchangeBehavior for DefaultExchangeBehavior { +impl ExchangeBehavior for FullyAllocatedExchangeBehavior { fn allocate() -> Vec { Vec::with_capacity(Message::::default_length()) } @@ -55,8 +55,8 @@ impl ExchangeBehavior for DefaultExchangeBehavior { } } -/// Default exchange type -pub type Exchange = ExchangePusherGeneric; +/// Default exchange type is to fully allocate exchange buffers +pub type Exchange = ExchangePusherGeneric; impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. diff --git a/timely/src/dataflow/channels/pushers/lazy_exchange.rs b/timely/src/dataflow/channels/pushers/lazy_exchange.rs deleted file mode 100644 index 080e92056..000000000 --- a/timely/src/dataflow/channels/pushers/lazy_exchange.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! The exchange pattern distributes pushed data between many target pushees. - -use std::marker::PhantomData; - -use crate::ExchangeData; -use crate::communication::{Pull, Push}; -use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; -use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; -use crate::dataflow::channels::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; - -/// Distributes records among target pushees according to a distribution function. -/// -/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but -/// tries to leave less allocations around. It does not preallocate a buffer for each pushee, but -/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and -/// only what it is passed back is retained. -pub struct LazyExchangeBehavior {} - -impl ExchangeBehavior for LazyExchangeBehavior { - fn allocate() -> Vec { - Vec::new() - } - - fn check(buffer: &mut Vec) { - if buffer.capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - buffer.capacity(); - buffer.reserve(to_reserve); - } - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Message::push_at_no_allocation(buffer, time, pusher); - } - - fn finalize(_buffer: &mut Vec) { - // None - } -} - -/// Lazy exchange pusher definition -pub type LazyExchangePusher = ExchangePusherGeneric; - -/// An exchange between multiple observers by data, backed by [LazyExchangePusher]. -pub struct LazyExchange { hash_func: F, phantom: PhantomData } - -implu64+'static> LazyExchange { - /// Allocates a new `LeanExchange` pact from a distribution function. - pub fn new(func: F) -> Self { - Self { - hash_func: func, - phantom: PhantomData, - } - } -} - -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for LazyExchange { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(LazyExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) - } -} - -impl std::fmt::Debug for LazyExchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LazyExchange").finish() - } -} diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 16b08e65f..496a13b67 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,8 +3,7 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; -pub mod eager_exchange; +pub mod non_retaining_exchange; pub mod exchange; -pub mod lazy_exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/channels/pushers/eager_exchange.rs b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs similarity index 72% rename from timely/src/dataflow/channels/pushers/eager_exchange.rs rename to timely/src/dataflow/channels/pushers/non_retaining_exchange.rs index f178d6158..b53e12f06 100644 --- a/timely/src/dataflow/channels/pushers/eager_exchange.rs +++ b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs @@ -16,9 +16,9 @@ use crate::worker::AsWorker; /// leaves no allocations around. It does not preallocate a buffer for each pushee, but /// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and /// any returned allocation will be dropped. -pub struct EagerExchangeBehavior {} +pub struct NonRetainingExchangeBehavior {} -impl ExchangeBehavior for EagerExchangeBehavior { +impl ExchangeBehavior for NonRetainingExchangeBehavior { fn allocate() -> Vec { Vec::new() } @@ -39,14 +39,14 @@ impl ExchangeBehavior for EagerExchangeBehavior { } } -/// Eager exchange pusher definition -pub type EagerExchangePusher = ExchangePusherGeneric; +/// Non-retaining exchange pusher definition +pub type NonRetainingExchangePusher = ExchangePusherGeneric; -/// An exchange between multiple observers by data, backed by [EagerExchangePusher]. -pub struct EagerExchange { hash_func: F, phantom: PhantomData } +/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher]. +pub struct NonRetainingExchange { hash_func: F, phantom: PhantomData } -implu64+'static> EagerExchange { - /// Allocates a new `LazyExchange` pact from a distribution function. +implu64+'static> NonRetainingExchange { + /// Allocates a new exchange pact from a distribution function. pub fn new(func: F) -> Self { Self { hash_func: func, @@ -56,7 +56,7 @@ implu64+'static> EagerExchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for EagerExchange { +implu64+'static> ParallelizationContract for NonRetainingExchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; @@ -64,12 +64,12 @@ implu64+'static> Paralleliza fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(EagerExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + (Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) } } -impl std::fmt::Debug for EagerExchange { +impl std::fmt::Debug for NonRetainingExchange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("EagerExchange").finish() + f.debug_struct("NonRetainingExchange").finish() } } diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 9880c474b..1f6438b67 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -29,11 +29,11 @@ pub trait Exchange { /// # Examples /// ``` /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; - /// use timely::dataflow::channels::pact::LazyExchange; + /// use timely::dataflow::channels::pact::NonRetainingExchange; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) - /// .apply_pact(LazyExchange::new(|x| *x)) + /// .apply_pact(NonRetainingExchange::new(|x| *x)) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ```