diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 491c36585..981878ee0 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -6,9 +6,6 @@ use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::{ - Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract, -}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; use timely::{CommunicationConfig, Config, WorkerConfig}; @@ -43,22 +40,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - || ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetaining", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_exchange( - config, - params.batch, - iters, - || NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -76,25 +57,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - ||ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetainingZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_exchange( - config, - params.batch, - iters, - ||NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -103,15 +65,14 @@ fn bench(c: &mut Criterion) { } } -fn experiment_exchange P + Sync + Send + 'static, P: ParallelizationContract>( +fn experiment_exchange( config: Config, batch: u64, rounds: u64, - pact: F, ) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe()); + let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe()); let mut time = 0; let timer = Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 000f72b48..43b5e4279 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -9,8 +9,7 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; -use crate::ExchangeData; -use crate::communication::{Push, Pull}; +use crate::communication::{Push, Pull, Data}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::worker::AsWorker; @@ -19,8 +18,6 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -pub use super::pushers::non_retaining_exchange::NonRetainingExchange; - /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. @@ -61,7 +58,7 @@ implu64+'static> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContract for Exchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index bc4aa4fa8..fad8340c6 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,89 +1,43 @@ //! The exchange pattern distributes pushed data between many target pushees. -use std::marker::PhantomData; - use crate::Data; use crate::communication::Push; use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct ExchangePusherGeneric>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior> { +pub struct Exchange>, H: FnMut(&T, &D) -> u64> { pushers: Vec

, buffers: Vec>, current: Option, hash_func: H, - _phantom_data: PhantomData, -} - -/// The behavior of an exchange specialization -/// -/// This trait gives specialized exchange implementations the opportunity to hook into interesting -/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting -/// from creation, ensuring allocations, flushing and finalizing. -pub trait ExchangeBehavior { - /// Allocate a new buffer, called while creating the exchange pusher. - fn allocate() -> Vec; - /// Check the buffer's capacity before pushing a single element. - fn check(buffer: &mut Vec); - /// Flush a buffer's contents, either when the buffer is at capacity or when no more data is - /// available. - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P); - /// Finalize a buffer after pushing `None`, i.e. no more data is available. - fn finalize(buffer: &mut Vec); -} - -/// Exchange behavior that always has push buffers fully allocated. -pub struct FullyAllocatedExchangeBehavior {} - -impl ExchangeBehavior for FullyAllocatedExchangeBehavior { - fn allocate() -> Vec { - Vec::with_capacity(Message::::default_length()) - } - - fn check(_buffer: &mut Vec) { - // Not needed, always allocated - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - // `push_at` ensures an allocation. - Message::push_at(buffer, time, pusher); - } - - fn finalize(_buffer: &mut Vec) { - // retain any allocation - } } -/// Default exchange type is to fully allocate exchange buffers -pub type Exchange = ExchangePusherGeneric; - -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { - /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Exchange { + /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(B::allocate()); + buffers.push(Vec::new()); } - ExchangePusherGeneric { + Exchange { pushers, hash_func: key, buffers, current: None, - _phantom_data: PhantomData, } } #[inline] fn flush(&mut self, index: usize) { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); } } } } -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> Push> for ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { #[inline(never)] fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange @@ -109,7 +63,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha let mask = (self.pushers.len() - 1) as u64; for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) & mask) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -128,7 +85,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha else { for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -142,7 +102,6 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha for index in 0..self.pushers.len() { self.flush(index); self.pushers[index].push(&mut None); - B::finalize(&mut self.buffers[index]); } } } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 496a13b67..295d033ca 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,7 +3,6 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; -pub mod non_retaining_exchange; pub mod exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs deleted file mode 100644 index b53e12f06..000000000 --- a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! The exchange pattern distributes pushed data between many target pushees. - -use std::marker::PhantomData; - -use crate::ExchangeData; -use crate::communication::{Pull, Push}; -use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; -use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; -use crate::dataflow::channels::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; - -/// Distributes records among target pushees according to a distribution function. -/// -/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but -/// leaves no allocations around. It does not preallocate a buffer for each pushee, but -/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and -/// any returned allocation will be dropped. -pub struct NonRetainingExchangeBehavior {} - -impl ExchangeBehavior for NonRetainingExchangeBehavior { - fn allocate() -> Vec { - Vec::new() - } - - fn check(buffer: &mut Vec) { - if buffer.capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - buffer.capacity(); - buffer.reserve(to_reserve); - } - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Message::push_at_no_allocation(buffer, time, pusher); - } - - fn finalize(buffer: &mut Vec) { - *buffer = Vec::new(); - } -} - -/// Non-retaining exchange pusher definition -pub type NonRetainingExchangePusher = ExchangePusherGeneric; - -/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher]. -pub struct NonRetainingExchange { hash_func: F, phantom: PhantomData } - -implu64+'static> NonRetainingExchange { - /// Allocates a new exchange pact from a distribution function. - pub fn new(func: F) -> Self { - Self { - hash_func: func, - phantom: PhantomData, - } - } -} - -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for NonRetainingExchange { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) - } -} - -impl std::fmt::Debug for NonRetainingExchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NonRetainingExchange").finish() - } -} diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 1f6438b67..c106ce37f 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -1,7 +1,7 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::dataflow::channels::pact::{Exchange as ExchangePact, ParallelizationContract}; +use crate::dataflow::channels::pact::Exchange as ExchangePact; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; @@ -23,31 +23,13 @@ pub trait Exchange { /// }); /// ``` fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Self; - - /// Apply a parallelization contract on the data in a stream - /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; - /// use timely::dataflow::channels::pact::NonRetainingExchange; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .apply_pact(NonRetainingExchange::new(|x| *x)) - /// .inspect(|x| println!("seen: {:?}", x)); - /// }); - /// ``` - fn apply_pact>(&self, pact: P) -> Self where T: 'static; } +// impl, D: ExchangeData> Exchange for Stream { impl Exchange for Stream { fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream { - self.apply_pact(ExchangePact::new(route)) - } - - fn apply_pact>(&self, pact: P) -> Stream { let mut vector = Vec::new(); - self.unary(pact, "Exchange", move |_,_| move |input, output| { + self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); output.session(&time).give_vec(&mut vector);