From 3afe4cc32ab982c88d7c1a514bc1e57eeb868f8c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 21 Jul 2021 17:04:38 +0200 Subject: [PATCH 01/11] Add the LazyExchange type The LazyExchange PACT avoids allocations ahead of time while maintaining the performance of the eagerly-allocating Exchange PACT. Extend the Exchange operator with an `exchange_pact` function taking an explicit PACT. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/mod.rs | 26 ++- timely/src/dataflow/channels/pact.rs | 7 +- .../channels/pushers/lazy_exchange.rs | 150 ++++++++++++++++++ timely/src/dataflow/channels/pushers/mod.rs | 1 + timely/src/dataflow/operators/exchange.rs | 24 ++- 5 files changed, 199 insertions(+), 9 deletions(-) create mode 100644 timely/src/dataflow/channels/pushers/lazy_exchange.rs diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index a380e0ac35..352adfff38 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -46,10 +46,27 @@ impl Message { Message { time, data, from, seq } } - /// Forms a message, and pushes contents at `pusher`. + /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher + /// leaves in place, or a new `Vec`. Note that the returned vector is always initialized with + /// a capacity of [Self::default_length] elements. #[inline] pub fn push_at>>(buffer: &mut Vec, time: T, pusher: &mut P) { + Self::push_at_no_allocation(buffer, time, pusher); + + // TODO: Unclear we always want this here. + if buffer.capacity() != Self::default_length() { + *buffer = Vec::with_capacity(Self::default_length()); + } + } + + /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher + /// leaves in place, or a new empty `Vec`. If the pusher leaves a vector with a capacity larger + /// than [Self::default_length], the vector is initialized with a new vector with + /// [Self::default_length] capacity. + #[inline] + pub fn push_at_no_allocation>>(buffer: &mut Vec, time: T, pusher: &mut P) { + let data = ::std::mem::replace(buffer, Vec::new()); let message = Message::new(time, data, 0, 0); let mut bundle = Some(Bundle::from_typed(message)); @@ -63,8 +80,9 @@ impl Message { } } - // TODO: Unclear we always want this here. - if buffer.capacity() != Self::default_length() { + // Avoid memory leaks by buffers growing out of bounds + if buffer.capacity() > Self::default_length() { *buffer = Vec::with_capacity(Self::default_length()); } - }} + } +} diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 43b5e42792..d5440aec71 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -9,7 +9,8 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; -use crate::communication::{Push, Pull, Data}; +use crate::ExchangeData; +use crate::communication::{Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::worker::AsWorker; @@ -18,6 +19,8 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; +pub use super::pushers::lazy_exchange::LazyExchange; + /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. @@ -58,7 +61,7 @@ implu64+'static> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContract for Exchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; diff --git a/timely/src/dataflow/channels/pushers/lazy_exchange.rs b/timely/src/dataflow/channels/pushers/lazy_exchange.rs new file mode 100644 index 0000000000..c75089e986 --- /dev/null +++ b/timely/src/dataflow/channels/pushers/lazy_exchange.rs @@ -0,0 +1,150 @@ +//! The exchange pattern distributes pushed data between many target pushees. + +use std::marker::PhantomData; + +use crate::{Data, ExchangeData}; +use crate::communication::{Pull, Push}; +use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; +use crate::dataflow::channels::{Bundle, Message}; +use crate::logging::TimelyLogger as Logger; +use crate::worker::AsWorker; + +/// Distributes records among target pushees according to a distribution function. +/// +/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but +/// tries to leave less allocations around. It does not preallocate a buffer for each pushee, but +/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and +/// only what it is passed back is retained. +pub struct LazyExchangePusher>, H: FnMut(&T, &D) -> u64> { + pushers: Vec

, + buffers: Vec>, + current: Option, + hash_func: H, +} + +impl>, H: FnMut(&T, &D)->u64> LazyExchangePusher { + /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> LazyExchangePusher { + let buffers = (0..pushers.len()).map(|_| vec![]).collect(); + LazyExchangePusher { + pushers, + hash_func: key, + buffers, + current: None, + } + } + #[inline] + fn flush(&mut self, index: usize) { + if !self.buffers[index].is_empty() { + if let Some(ref time) = self.current { + Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + } + } + } +} + +impl>, H: FnMut(&T, &D)->u64> Push> for LazyExchangePusher { + fn push(&mut self, message: &mut Option>) { + // if only one pusher, no exchange + if self.pushers.len() == 1 { + self.pushers[0].push(message); + } else if let Some(message) = message { + let message = message.as_mut(); + let time = &message.time; + let data = &mut message.data; + + // if the time isn't right, flush everything. + if self.current.as_ref().map_or(false, |x| x != time) { + for index in 0..self.pushers.len() { + self.flush(index); + } + } + self.current = Some(time.clone()); + + // if the number of pushers is a power of two, use a mask + if (self.pushers.len() & (self.pushers.len() - 1)) == 0 { + let mask = (self.pushers.len() - 1) as u64; + for datum in data.drain(..) { + let index = (((self.hash_func)(time, &datum)) & mask) as usize; + // Push at the target buffer, which might be without capacity, or preallocated + self.buffers[index].push(datum); + // We have reached the buffer's capacity + if self.buffers[index].len() == self.buffers[index].capacity() { + // If the buffer's capacity is below the default length, reallocate to match + // the default length + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } else { + // Buffer is at capacity, flush + self.flush(index); + // Explicitly allocate a new buffer under the assumption that more data + // will be sent to the pushee. + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers.reserve(to_reserve); + } + } + } + } + } else { + // as a last resort, use mod (%) + for datum in data.drain(..) { + let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; + self.buffers[index].push(datum); + // This code is duplicated from above, keep in sync! + if self.buffers[index].len() == self.buffers[index].capacity() { + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } else { + self.flush(index); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers.reserve(to_reserve); + } + } + } + } + } + } else { + // flush + for index in 0..self.pushers.len() { + self.flush(index); + self.pushers[index].push(&mut None); + } + } + } +} + +/// An exchange between multiple observers by data, backed by [LazyExchangePusher]. +pub struct LazyExchange { hash_func: F, phantom: PhantomData } + +implu64+'static> LazyExchange { + /// Allocates a new `LeanExchange` pact from a distribution function. + pub fn new(func: F) -> Self { + Self { + hash_func: func, + phantom: PhantomData, + } + } +} + +// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. +implu64+'static> ParallelizationContract for LazyExchange { + // TODO: The closure in the type prevents us from naming it. + // Could specialize `ExchangePusher` to a time-free version. + type Pusher = Box>>; + type Puller = Box>>; + fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + (Box::new(LazyExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + } +} + +impl std::fmt::Debug for LazyExchange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazyExchange").finish() + } +} diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 295d033cab..0b5eca10ec 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -4,5 +4,6 @@ pub use self::counter::Counter; pub mod tee; pub mod exchange; +pub mod lazy_exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index c106ce37f1..9880c474b5 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -1,7 +1,7 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::dataflow::channels::pact::Exchange as ExchangePact; +use crate::dataflow::channels::pact::{Exchange as ExchangePact, ParallelizationContract}; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; @@ -23,13 +23,31 @@ pub trait Exchange { /// }); /// ``` fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Self; + + /// Apply a parallelization contract on the data in a stream + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; + /// use timely::dataflow::channels::pact::LazyExchange; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .apply_pact(LazyExchange::new(|x| *x)) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn apply_pact>(&self, pact: P) -> Self where T: 'static; } -// impl, D: ExchangeData> Exchange for Stream { impl Exchange for Stream { fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream { + self.apply_pact(ExchangePact::new(route)) + } + + fn apply_pact>(&self, pact: P) -> Stream { let mut vector = Vec::new(); - self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { + self.unary(pact, "Exchange", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); output.session(&time).give_vec(&mut vector); From b9e5d1698e7c7dc08ec97f75d324c2c89fa90e17 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 30 Jul 2021 11:23:00 +0200 Subject: [PATCH 02/11] experiments: Add exchange Signed-off-by: Moritz Hoffmann --- experiments/Cargo.toml | 9 ++++++ experiments/examples/exchange.rs | 52 ++++++++++++++++++++++++++++++++ experiments/examples/exchange.sh | 38 +++++++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 experiments/Cargo.toml create mode 100644 experiments/examples/exchange.rs create mode 100755 experiments/examples/exchange.sh diff --git a/experiments/Cargo.toml b/experiments/Cargo.toml new file mode 100644 index 0000000000..79f0c2fdac --- /dev/null +++ b/experiments/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "experiments" +version = "0.1.0" +edition = "2018" + +[workspace] + +[dependencies] +timely = { path = "../timely" } diff --git a/experiments/examples/exchange.rs b/experiments/examples/exchange.rs new file mode 100644 index 0000000000..74be4cafaa --- /dev/null +++ b/experiments/examples/exchange.rs @@ -0,0 +1,52 @@ +extern crate timely; + +use timely::dataflow::InputHandle; +use timely::dataflow::operators::{Input, Exchange, Probe}; + +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + + let batch = std::env::args().nth(1).unwrap().parse::().unwrap(); + let rounds = std::env::args().nth(2).unwrap().parse::().unwrap(); + let mut input = InputHandle::new(); + + // create a new input, exchange data, and inspect its output + let probe = worker.dataflow(|scope| + scope + .input_from(&mut input) + .exchange(|&x| x as u64) + .probe() + ); + + let mut time = 0; + + let mut round_batch = 0; + while round_batch < batch { + let timer = std::time::Instant::now(); + + for round in 0..rounds { + for i in 0..round_batch { + input.send(i); + } + time += 1; + input.advance_to(time); + + while probe.less_than(input.time()) { + worker.step(); + } + } + + let volume = (rounds * batch) as f64; + let elapsed = timer.elapsed(); + let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos()) / 1000000000.0); + + if worker.index() == 0 { + println!("{}\t{:?}\t{:?}", round_batch, timer.elapsed().as_secs_f64(), volume / seconds); + } + + round_batch += std::cmp::max(worker.peers(), 64); + } + + }).unwrap(); +} diff --git a/experiments/examples/exchange.sh b/experiments/examples/exchange.sh new file mode 100755 index 0000000000..604dfb5d7d --- /dev/null +++ b/experiments/examples/exchange.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +set -eou pipefail + +BRANCH="$(git symbolic-ref --short HEAD)" +REVISION="$(git rev-parse --short HEAD)" +if [ -z ${SUFFIX+x} ]; then + SUFFIX="" +else + SUFFIX="_$SUFFIX" +fi +GP="exchange_${BRANCH}_${REVISION}$SUFFIX.gp" +PLOT="exchange_${BRANCH}_${REVISION}$SUFFIX.svg" +cargo build --release --example exchange + +cat > "$GP" <> "$GP" + i=$((i * 2)) + if ((i <= NPROC)); then + echo ", \\" >> "$GP" + fi +done + +echo "# Now, run:" +echo " gnuplot '$GP'" From 85e07fb87c46ec467e2a9322a7cfec9308a3418c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 16 Aug 2021 19:01:54 +0200 Subject: [PATCH 03/11] Add exchange bench on Criterion Signed-off-by: Moritz Hoffmann --- Cargo.toml | 1 + experiments/Cargo.toml | 7 +- experiments/benches/exchange_bench.rs | 148 ++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 experiments/benches/exchange_bench.rs diff --git a/Cargo.toml b/Cargo.toml index e49f154056..a7feac1a1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "kafkaesque", "logging", "timely", + "experiments" ] [profile.release] diff --git a/experiments/Cargo.toml b/experiments/Cargo.toml index 79f0c2fdac..6a0453f187 100644 --- a/experiments/Cargo.toml +++ b/experiments/Cargo.toml @@ -3,7 +3,10 @@ name = "experiments" version = "0.1.0" edition = "2018" -[workspace] - [dependencies] +criterion = "0.3.5" timely = { path = "../timely" } + +[[bench]] +name = "exchange_bench" +harness = false diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs new file mode 100644 index 0000000000..218b9f4e77 --- /dev/null +++ b/experiments/benches/exchange_bench.rs @@ -0,0 +1,148 @@ +extern crate timely; + +use std::fmt::{Display, Formatter}; +use std::iter::repeat; +use std::time::{Duration, Instant}; + +use criterion::black_box; +use criterion::*; + +use timely::dataflow::channels::pact::LazyExchange; +use timely::dataflow::operators::{Exchange, Input, Probe}; +use timely::dataflow::InputHandle; +use timely::{WorkerConfig, CommunicationConfig, Config}; + +#[derive(Clone)] +struct ExperimentConfig { + threads: usize, + batch: u64, +} + +impl Display for ExperimentConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "threads={:2},batch={:5}", self.threads, self.batch) + } +} + +fn bench(c: &mut Criterion) { + let mut group = c.benchmark_group("exchange"); + for threads in [1, 2, 4, 8, 16] { + for shift in [0, 2, 4, 6, 8, 10, 12, 14, 16] { + let params = ExperimentConfig { + threads, + batch: 1u64 << shift, + }; + group.bench_with_input( + BenchmarkId::new("Default", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config::process(params.threads); + black_box(experiment_exchange(config, params.batch, iters)) + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("Lazy", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config::process(params.threads); + black_box(experiment_lazy_exchange(config, params.batch, iters)) + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("DefaultZero", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config { + communication: CommunicationConfig::ProcessBinary(params.threads), + worker: WorkerConfig::default(), + }; + black_box(experiment_exchange(config, params.batch, iters)) + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("LazyZero", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config { + communication: CommunicationConfig::ProcessBinary(params.threads), + worker: WorkerConfig::default(), + }; + black_box(experiment_lazy_exchange(config, params.batch, iters)) + }) + }, + ); + } + } +} + +fn experiment_exchange(config: Config, batch: u64, rounds: u64) -> Duration { + timely::execute(config, move |worker| { + let mut input = InputHandle::new(); + let probe = + worker.dataflow(|scope| scope.input_from(&mut input).exchange(|&x| x as u64).probe()); + + let mut time = 0; + let timer = Instant::now(); + + for _round in 0..rounds { + for i in 0..batch { + input.send(i); + } + time += 1; + input.advance_to(time); + while probe.less_than(input.time()) { + worker.step(); + } + } + timer.elapsed() + }) + .unwrap() + .join() + .into_iter() + .next() + .unwrap() + .unwrap() +} + +fn experiment_lazy_exchange(config: Config, batch: u64, rounds: u64) -> Duration { + timely::execute(config, move |worker| { + let mut input = InputHandle::new(); + let probe = worker.dataflow(|scope| { + scope + .input_from(&mut input) + .apply_pact(LazyExchange::new(|&x| x as u64)) + .probe() + }); + + let mut time = 0; + let timer = Instant::now(); + + for _round in 0..rounds { + for i in 0..batch { + input.send(i); + } + time += 1; + input.advance_to(time); + while probe.less_than(input.time()) { + worker.step(); + } + } + timer.elapsed() + }) + .unwrap() + .join() + .into_iter() + .next() + .unwrap() + .unwrap() +} + +criterion_group!(benches, bench); +criterion_main!(benches); From 6648092271ea35f51b75ee6637541d228b815f4e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 17 Aug 2021 09:37:43 +0200 Subject: [PATCH 04/11] Eagerly deallocating exchange pusher Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pact.rs | 1 + .../channels/pushers/eager_exchange.rs | 135 ++++++++++++++++++ timely/src/dataflow/channels/pushers/mod.rs | 1 + 3 files changed, 137 insertions(+) create mode 100644 timely/src/dataflow/channels/pushers/eager_exchange.rs diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index d5440aec71..9ffea67162 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -20,6 +20,7 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; pub use super::pushers::lazy_exchange::LazyExchange; +pub use super::pushers::eager_exchange::EagerExchange; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { diff --git a/timely/src/dataflow/channels/pushers/eager_exchange.rs b/timely/src/dataflow/channels/pushers/eager_exchange.rs new file mode 100644 index 0000000000..d4011a8429 --- /dev/null +++ b/timely/src/dataflow/channels/pushers/eager_exchange.rs @@ -0,0 +1,135 @@ +//! The exchange pattern distributes pushed data between many target pushees. + +use std::marker::PhantomData; + +use crate::{Data, ExchangeData}; +use crate::communication::{Pull, Push}; +use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; +use crate::dataflow::channels::{Bundle, Message}; +use crate::logging::TimelyLogger as Logger; +use crate::worker::AsWorker; + +/// Distributes records among target pushees according to a distribution function. +/// +/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but +/// leaves no allocations around. It does not preallocate a buffer for each pushee, but +/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and +/// any returned allocation will be dropped. +pub struct EagerExchangePusher>, H: FnMut(&T, &D) -> u64> { + pushers: Vec

, + buffers: Vec>, + current: Option, + hash_func: H, +} + +impl>, H: FnMut(&T, &D)->u64> EagerExchangePusher { + /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> EagerExchangePusher { + let buffers = (0..pushers.len()).map(|_| vec![]).collect(); + EagerExchangePusher { + pushers, + hash_func: key, + buffers, + current: None, + } + } + #[inline] + fn flush(&mut self, index: usize) { + if !self.buffers[index].is_empty() { + if let Some(ref time) = self.current { + Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + } + } + } +} + +impl>, H: FnMut(&T, &D)->u64> Push> for EagerExchangePusher { + fn push(&mut self, message: &mut Option>) { + // if only one pusher, no exchange + if self.pushers.len() == 1 { + self.pushers[0].push(message); + } else if let Some(message) = message { + let message = message.as_mut(); + let time = &message.time; + let data = &mut message.data; + + // if the time isn't right, flush everything. + if self.current.as_ref().map_or(false, |x| x != time) { + for index in 0..self.pushers.len() { + self.flush(index); + } + } + self.current = Some(time.clone()); + + // if the number of pushers is a power of two, use a mask + if (self.pushers.len() & (self.pushers.len() - 1)) == 0 { + let mask = (self.pushers.len() - 1) as u64; + for datum in data.drain(..) { + let index = (((self.hash_func)(time, &datum)) & mask) as usize; + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } + self.buffers[index].push(datum); + // We have reached the buffer's capacity + if self.buffers[index].len() == self.buffers[index].capacity() { + self.flush(index); + } + } + } else { + // as a last resort, use mod (%) + for datum in data.drain(..) { + let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } + self.buffers[index].push(datum); + // We have reached the buffer's capacity + if self.buffers[index].len() == self.buffers[index].capacity() { + self.flush(index); + } + } + } + } else { + // flush + for index in 0..self.pushers.len() { + self.flush(index); + self.pushers[index].push(&mut None); + self.buffers[index] = Vec::new(); + } + } + } +} + +/// An exchange between multiple observers by data, backed by [EagerExchangePusher]. +pub struct EagerExchange { hash_func: F, phantom: PhantomData } + +implu64+'static> EagerExchange { + /// Allocates a new `LeanExchange` pact from a distribution function. + pub fn new(func: F) -> Self { + Self { + hash_func: func, + phantom: PhantomData, + } + } +} + +// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. +implu64+'static> ParallelizationContract for EagerExchange { + // TODO: The closure in the type prevents us from naming it. + // Could specialize `ExchangePusher` to a time-free version. + type Pusher = Box>>; + type Puller = Box>>; + fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + (Box::new(EagerExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + } +} + +impl std::fmt::Debug for EagerExchange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EagerExchange").finish() + } +} diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 0b5eca10ec..16b08e65f8 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,6 +3,7 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; +pub mod eager_exchange; pub mod exchange; pub mod lazy_exchange; pub mod counter; From 90702355940bc4620e9db4ef5616997db882bdf0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 17 Aug 2021 09:37:59 +0200 Subject: [PATCH 05/11] Bench eagerly deallocating exchange pusher Signed-off-by: Moritz Hoffmann --- experiments/benches/exchange_bench.rs | 63 +++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 218b9f4e77..08a3ccf3f1 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -1,16 +1,15 @@ extern crate timely; use std::fmt::{Display, Formatter}; -use std::iter::repeat; use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::LazyExchange; +use timely::dataflow::channels::pact::{EagerExchange, LazyExchange}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; -use timely::{WorkerConfig, CommunicationConfig, Config}; +use timely::{CommunicationConfig, Config, WorkerConfig}; #[derive(Clone)] struct ExperimentConfig { @@ -27,7 +26,7 @@ impl Display for ExperimentConfig { fn bench(c: &mut Criterion) { let mut group = c.benchmark_group("exchange"); for threads in [1, 2, 4, 8, 16] { - for shift in [0, 2, 4, 6, 8, 10, 12, 14, 16] { + for shift in [0, 4, 8, 14] { let params = ExperimentConfig { threads, batch: 1u64 << shift, @@ -52,6 +51,16 @@ fn bench(c: &mut Criterion) { }) }, ); + group.bench_with_input( + BenchmarkId::new("Eager", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config::process(params.threads); + black_box(experiment_eager_exchange(config, params.batch, iters)) + }) + }, + ); group.bench_with_input( BenchmarkId::new("DefaultZero", params.clone()), ¶ms, @@ -78,6 +87,19 @@ fn bench(c: &mut Criterion) { }) }, ); + group.bench_with_input( + BenchmarkId::new("EagerZero", params.clone()), + ¶ms, + move |b, params| { + b.iter_custom(|iters| { + let config = Config { + communication: CommunicationConfig::ProcessBinary(params.threads), + worker: WorkerConfig::default(), + }; + black_box(experiment_eager_exchange(config, params.batch, iters)) + }) + }, + ); } } } @@ -111,6 +133,39 @@ fn experiment_exchange(config: Config, batch: u64, rounds: u64) -> Duration { .unwrap() } +fn experiment_eager_exchange(config: Config, batch: u64, rounds: u64) -> Duration { + timely::execute(config, move |worker| { + let mut input = InputHandle::new(); + let probe = worker.dataflow(|scope| { + scope + .input_from(&mut input) + .apply_pact(EagerExchange::new(|&x| x as u64)) + .probe() + }); + + let mut time = 0; + let timer = Instant::now(); + + for _round in 0..rounds { + for i in 0..batch { + input.send(i); + } + time += 1; + input.advance_to(time); + while probe.less_than(input.time()) { + worker.step(); + } + } + timer.elapsed() + }) + .unwrap() + .join() + .into_iter() + .next() + .unwrap() + .unwrap() +} + fn experiment_lazy_exchange(config: Config, batch: u64, rounds: u64) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); From 699b90a4141fa797e256b6c34bcd28048b35162c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 23 Aug 2021 13:47:45 +0200 Subject: [PATCH 06/11] experiments: cleanup unused exchange example Signed-off-by: Moritz Hoffmann --- experiments/examples/exchange.rs | 52 -------------------------------- experiments/examples/exchange.sh | 38 ----------------------- 2 files changed, 90 deletions(-) delete mode 100644 experiments/examples/exchange.rs delete mode 100755 experiments/examples/exchange.sh diff --git a/experiments/examples/exchange.rs b/experiments/examples/exchange.rs deleted file mode 100644 index 74be4cafaa..0000000000 --- a/experiments/examples/exchange.rs +++ /dev/null @@ -1,52 +0,0 @@ -extern crate timely; - -use timely::dataflow::InputHandle; -use timely::dataflow::operators::{Input, Exchange, Probe}; - -fn main() { - // initializes and runs a timely dataflow. - timely::execute_from_args(std::env::args(), |worker| { - - let batch = std::env::args().nth(1).unwrap().parse::().unwrap(); - let rounds = std::env::args().nth(2).unwrap().parse::().unwrap(); - let mut input = InputHandle::new(); - - // create a new input, exchange data, and inspect its output - let probe = worker.dataflow(|scope| - scope - .input_from(&mut input) - .exchange(|&x| x as u64) - .probe() - ); - - let mut time = 0; - - let mut round_batch = 0; - while round_batch < batch { - let timer = std::time::Instant::now(); - - for round in 0..rounds { - for i in 0..round_batch { - input.send(i); - } - time += 1; - input.advance_to(time); - - while probe.less_than(input.time()) { - worker.step(); - } - } - - let volume = (rounds * batch) as f64; - let elapsed = timer.elapsed(); - let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos()) / 1000000000.0); - - if worker.index() == 0 { - println!("{}\t{:?}\t{:?}", round_batch, timer.elapsed().as_secs_f64(), volume / seconds); - } - - round_batch += std::cmp::max(worker.peers(), 64); - } - - }).unwrap(); -} diff --git a/experiments/examples/exchange.sh b/experiments/examples/exchange.sh deleted file mode 100755 index 604dfb5d7d..0000000000 --- a/experiments/examples/exchange.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bash -set -eou pipefail - -BRANCH="$(git symbolic-ref --short HEAD)" -REVISION="$(git rev-parse --short HEAD)" -if [ -z ${SUFFIX+x} ]; then - SUFFIX="" -else - SUFFIX="_$SUFFIX" -fi -GP="exchange_${BRANCH}_${REVISION}$SUFFIX.gp" -PLOT="exchange_${BRANCH}_${REVISION}$SUFFIX.svg" -cargo build --release --example exchange - -cat > "$GP" <> "$GP" - i=$((i * 2)) - if ((i <= NPROC)); then - echo ", \\" >> "$GP" - fi -done - -echo "# Now, run:" -echo " gnuplot '$GP'" From 8ecced5afe6bf53617faedfed0f703e9c7179ddf Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 23 Aug 2021 11:32:59 +0200 Subject: [PATCH 07/11] exchange: Unify all exchange variants to generic impl Instead of defining distinct exchange pushers, make the default generic and provide necessary hooks for specialized exchange pushers. Signed-off-by: Moritz Hoffmann --- .../channels/pushers/eager_exchange.rs | 100 ++++------------ .../src/dataflow/channels/pushers/exchange.rs | 66 ++++++++-- .../channels/pushers/lazy_exchange.rs | 113 +++--------------- 3 files changed, 96 insertions(+), 183 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/eager_exchange.rs b/timely/src/dataflow/channels/pushers/eager_exchange.rs index d4011a8429..f178d6158d 100644 --- a/timely/src/dataflow/channels/pushers/eager_exchange.rs +++ b/timely/src/dataflow/channels/pushers/eager_exchange.rs @@ -2,9 +2,10 @@ use std::marker::PhantomData; -use crate::{Data, ExchangeData}; +use crate::ExchangeData; use crate::communication::{Pull, Push}; use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; +use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; use crate::dataflow::channels::{Bundle, Message}; use crate::logging::TimelyLogger as Logger; use crate::worker::AsWorker; @@ -15,98 +16,37 @@ use crate::worker::AsWorker; /// leaves no allocations around. It does not preallocate a buffer for each pushee, but /// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and /// any returned allocation will be dropped. -pub struct EagerExchangePusher>, H: FnMut(&T, &D) -> u64> { - pushers: Vec

, - buffers: Vec>, - current: Option, - hash_func: H, -} +pub struct EagerExchangeBehavior {} -impl>, H: FnMut(&T, &D)->u64> EagerExchangePusher { - /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> EagerExchangePusher { - let buffers = (0..pushers.len()).map(|_| vec![]).collect(); - EagerExchangePusher { - pushers, - hash_func: key, - buffers, - current: None, - } +impl ExchangeBehavior for EagerExchangeBehavior { + fn allocate() -> Vec { + Vec::new() } - #[inline] - fn flush(&mut self, index: usize) { - if !self.buffers[index].is_empty() { - if let Some(ref time) = self.current { - Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); - } + + fn check(buffer: &mut Vec) { + if buffer.capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - buffer.capacity(); + buffer.reserve(to_reserve); } } -} -impl>, H: FnMut(&T, &D)->u64> Push> for EagerExchangePusher { - fn push(&mut self, message: &mut Option>) { - // if only one pusher, no exchange - if self.pushers.len() == 1 { - self.pushers[0].push(message); - } else if let Some(message) = message { - let message = message.as_mut(); - let time = &message.time; - let data = &mut message.data; - - // if the time isn't right, flush everything. - if self.current.as_ref().map_or(false, |x| x != time) { - for index in 0..self.pushers.len() { - self.flush(index); - } - } - self.current = Some(time.clone()); + fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { + Message::push_at_no_allocation(buffer, time, pusher); + } - // if the number of pushers is a power of two, use a mask - if (self.pushers.len() & (self.pushers.len() - 1)) == 0 { - let mask = (self.pushers.len() - 1) as u64; - for datum in data.drain(..) { - let index = (((self.hash_func)(time, &datum)) & mask) as usize; - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers[index].reserve(to_reserve); - } - self.buffers[index].push(datum); - // We have reached the buffer's capacity - if self.buffers[index].len() == self.buffers[index].capacity() { - self.flush(index); - } - } - } else { - // as a last resort, use mod (%) - for datum in data.drain(..) { - let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers[index].reserve(to_reserve); - } - self.buffers[index].push(datum); - // We have reached the buffer's capacity - if self.buffers[index].len() == self.buffers[index].capacity() { - self.flush(index); - } - } - } - } else { - // flush - for index in 0..self.pushers.len() { - self.flush(index); - self.pushers[index].push(&mut None); - self.buffers[index] = Vec::new(); - } - } + fn finalize(buffer: &mut Vec) { + *buffer = Vec::new(); } } +/// Eager exchange pusher definition +pub type EagerExchangePusher = ExchangePusherGeneric; + /// An exchange between multiple observers by data, backed by [EagerExchangePusher]. pub struct EagerExchange { hash_func: F, phantom: PhantomData } implu64+'static> EagerExchange { - /// Allocates a new `LeanExchange` pact from a distribution function. + /// Allocates a new `LazyExchange` pact from a distribution function. pub fn new(func: F) -> Self { Self { hash_func: func, diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 5b37a84386..8083bc9312 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,43 +1,89 @@ //! The exchange pattern distributes pushed data between many target pushees. +use std::marker::PhantomData; + use crate::Data; use crate::communication::Push; use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&T, &D) -> u64> { +pub struct ExchangePusherGeneric>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior> { pushers: Vec

, buffers: Vec>, current: Option, hash_func: H, + _phantom_data: PhantomData, +} + +/// The behavior of an exchange specialization +/// +/// This trait gives specialized exchange implementations the opportunity to hook into interesting +/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting +/// from creation, ensuring allocations, flushing and finalizing. +pub trait ExchangeBehavior { + /// Allocate a new buffer, called while creating the exchange pusher. + fn allocate() -> Vec; + /// Check the buffer's capacity before pushing a single element. + fn check(buffer: &mut Vec); + /// Flush a buffer's contents, either when the buffer is at capacity or when no more data is + /// available. + fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P); + /// Finalize a buffer after pushing `None`, i.e. no more data is available. + fn finalize(buffer: &mut Vec); } -impl>, H: FnMut(&T, &D)->u64> Exchange { - /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { +/// Default exchange behavior +pub struct DefaultExchangeBehavior {} + +impl ExchangeBehavior for DefaultExchangeBehavior { + fn allocate() -> Vec { + Vec::with_capacity(Message::::default_length()) + } + + fn check(_buffer: &mut Vec) { + // Not needed, always allocated + } + + fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { + // `push_at` ensures an allocation. + Message::push_at(buffer, time, pusher); + } + + fn finalize(_buffer: &mut Vec) { + // retain any allocation + } +} + +/// Default exchange type +pub type Exchange = ExchangePusherGeneric; + +impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { + /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> ExchangePusherGeneric { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(Vec::with_capacity(Message::::default_length())); + buffers.push(B::allocate()); } - Exchange { + ExchangePusherGeneric { pushers, hash_func: key, buffers, current: None, + _phantom_data: PhantomData, } } #[inline] fn flush(&mut self, index: usize) { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); } } } } -impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { +impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> Push> for ExchangePusherGeneric { #[inline(never)] fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange @@ -63,7 +109,7 @@ impl>, H: FnMut(&T, &D)->u64> Push>, H: FnMut(&T, &D)->u64> Push>, H: FnMut(&T, &D)->u64> Push>, H: FnMut(&T, &D) -> u64> { - pushers: Vec

, - buffers: Vec>, - current: Option, - hash_func: H, -} +pub struct LazyExchangeBehavior {} -impl>, H: FnMut(&T, &D)->u64> LazyExchangePusher { - /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> LazyExchangePusher { - let buffers = (0..pushers.len()).map(|_| vec![]).collect(); - LazyExchangePusher { - pushers, - hash_func: key, - buffers, - current: None, - } +impl ExchangeBehavior for LazyExchangeBehavior { + fn allocate() -> Vec { + Vec::new() } - #[inline] - fn flush(&mut self, index: usize) { - if !self.buffers[index].is_empty() { - if let Some(ref time) = self.current { - Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); - } + + fn check(buffer: &mut Vec) { + if buffer.capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - buffer.capacity(); + buffer.reserve(to_reserve); } } -} -impl>, H: FnMut(&T, &D)->u64> Push> for LazyExchangePusher { - fn push(&mut self, message: &mut Option>) { - // if only one pusher, no exchange - if self.pushers.len() == 1 { - self.pushers[0].push(message); - } else if let Some(message) = message { - let message = message.as_mut(); - let time = &message.time; - let data = &mut message.data; - - // if the time isn't right, flush everything. - if self.current.as_ref().map_or(false, |x| x != time) { - for index in 0..self.pushers.len() { - self.flush(index); - } - } - self.current = Some(time.clone()); + fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { + Message::push_at_no_allocation(buffer, time, pusher); + } - // if the number of pushers is a power of two, use a mask - if (self.pushers.len() & (self.pushers.len() - 1)) == 0 { - let mask = (self.pushers.len() - 1) as u64; - for datum in data.drain(..) { - let index = (((self.hash_func)(time, &datum)) & mask) as usize; - // Push at the target buffer, which might be without capacity, or preallocated - self.buffers[index].push(datum); - // We have reached the buffer's capacity - if self.buffers[index].len() == self.buffers[index].capacity() { - // If the buffer's capacity is below the default length, reallocate to match - // the default length - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers[index].reserve(to_reserve); - } else { - // Buffer is at capacity, flush - self.flush(index); - // Explicitly allocate a new buffer under the assumption that more data - // will be sent to the pushee. - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers.reserve(to_reserve); - } - } - } - } - } else { - // as a last resort, use mod (%) - for datum in data.drain(..) { - let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; - self.buffers[index].push(datum); - // This code is duplicated from above, keep in sync! - if self.buffers[index].len() == self.buffers[index].capacity() { - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers[index].reserve(to_reserve); - } else { - self.flush(index); - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers.reserve(to_reserve); - } - } - } - } - } - } else { - // flush - for index in 0..self.pushers.len() { - self.flush(index); - self.pushers[index].push(&mut None); - } - } + fn finalize(_buffer: &mut Vec) { + // None } } +/// Lazy exchange pusher definition +pub type LazyExchangePusher = ExchangePusherGeneric; + /// An exchange between multiple observers by data, backed by [LazyExchangePusher]. pub struct LazyExchange { hash_func: F, phantom: PhantomData } From 66c140132629593961d609aa64f854d672fd2cf4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Sep 2021 15:54:24 +0200 Subject: [PATCH 08/11] pact: Remove LazyExhange, rename eager to NonRetainingExchange Signed-off-by: Moritz Hoffmann --- experiments/benches/exchange_bench.rs | 135 +++++------------- timely/src/dataflow/channels/pact.rs | 3 +- .../src/dataflow/channels/pushers/exchange.rs | 10 +- .../channels/pushers/lazy_exchange.rs | 75 ---------- timely/src/dataflow/channels/pushers/mod.rs | 3 +- ..._exchange.rs => non_retaining_exchange.rs} | 24 ++-- timely/src/dataflow/operators/exchange.rs | 4 +- 7 files changed, 57 insertions(+), 197 deletions(-) delete mode 100644 timely/src/dataflow/channels/pushers/lazy_exchange.rs rename timely/src/dataflow/channels/pushers/{eager_exchange.rs => non_retaining_exchange.rs} (72%) diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 08a3ccf3f1..491c36585b 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -6,7 +6,9 @@ use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::{EagerExchange, LazyExchange}; +use timely::dataflow::channels::pact::{ + Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract, +}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; use timely::{CommunicationConfig, Config, WorkerConfig}; @@ -37,27 +39,27 @@ fn bench(c: &mut Criterion) { move |b, params| { b.iter_custom(|iters| { let config = Config::process(params.threads); - black_box(experiment_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + || ExchangePact::new(|&x| x as u64), + )) }) }, ); group.bench_with_input( - BenchmarkId::new("Lazy", params.clone()), + BenchmarkId::new("NonRetaining", params.clone()), ¶ms, move |b, params| { b.iter_custom(|iters| { let config = Config::process(params.threads); - black_box(experiment_lazy_exchange(config, params.batch, iters)) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("Eager", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_eager_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + || NonRetainingExchange::new(|&x| x as u64), + )) }) }, ); @@ -70,25 +72,17 @@ fn bench(c: &mut Criterion) { communication: CommunicationConfig::ProcessBinary(params.threads), worker: WorkerConfig::default(), }; - black_box(experiment_exchange(config, params.batch, iters)) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("LazyZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_lazy_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + ||ExchangePact::new(|&x| x as u64), + )) }) }, ); group.bench_with_input( - BenchmarkId::new("EagerZero", params.clone()), + BenchmarkId::new("NonRetainingZero", params.clone()), ¶ms, move |b, params| { b.iter_custom(|iters| { @@ -96,7 +90,12 @@ fn bench(c: &mut Criterion) { communication: CommunicationConfig::ProcessBinary(params.threads), worker: WorkerConfig::default(), }; - black_box(experiment_eager_exchange(config, params.batch, iters)) + black_box(experiment_exchange( + config, + params.batch, + iters, + ||NonRetainingExchange::new(|&x| x as u64), + )) }) }, ); @@ -104,77 +103,15 @@ fn bench(c: &mut Criterion) { } } -fn experiment_exchange(config: Config, batch: u64, rounds: u64) -> Duration { - timely::execute(config, move |worker| { - let mut input = InputHandle::new(); - let probe = - worker.dataflow(|scope| scope.input_from(&mut input).exchange(|&x| x as u64).probe()); - - let mut time = 0; - let timer = Instant::now(); - - for _round in 0..rounds { - for i in 0..batch { - input.send(i); - } - time += 1; - input.advance_to(time); - while probe.less_than(input.time()) { - worker.step(); - } - } - timer.elapsed() - }) - .unwrap() - .join() - .into_iter() - .next() - .unwrap() - .unwrap() -} - -fn experiment_eager_exchange(config: Config, batch: u64, rounds: u64) -> Duration { - timely::execute(config, move |worker| { - let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| { - scope - .input_from(&mut input) - .apply_pact(EagerExchange::new(|&x| x as u64)) - .probe() - }); - - let mut time = 0; - let timer = Instant::now(); - - for _round in 0..rounds { - for i in 0..batch { - input.send(i); - } - time += 1; - input.advance_to(time); - while probe.less_than(input.time()) { - worker.step(); - } - } - timer.elapsed() - }) - .unwrap() - .join() - .into_iter() - .next() - .unwrap() - .unwrap() -} - -fn experiment_lazy_exchange(config: Config, batch: u64, rounds: u64) -> Duration { +fn experiment_exchange P + Sync + Send + 'static, P: ParallelizationContract>( + config: Config, + batch: u64, + rounds: u64, + pact: F, +) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| { - scope - .input_from(&mut input) - .apply_pact(LazyExchange::new(|&x| x as u64)) - .probe() - }); + let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe()); let mut time = 0; let timer = Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 9ffea67162..000f72b48e 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -19,8 +19,7 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -pub use super::pushers::lazy_exchange::LazyExchange; -pub use super::pushers::eager_exchange::EagerExchange; +pub use super::pushers::non_retaining_exchange::NonRetainingExchange; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 8083bc9312..bc4aa4fa8d 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -33,10 +33,10 @@ pub trait ExchangeBehavior { fn finalize(buffer: &mut Vec); } -/// Default exchange behavior -pub struct DefaultExchangeBehavior {} +/// Exchange behavior that always has push buffers fully allocated. +pub struct FullyAllocatedExchangeBehavior {} -impl ExchangeBehavior for DefaultExchangeBehavior { +impl ExchangeBehavior for FullyAllocatedExchangeBehavior { fn allocate() -> Vec { Vec::with_capacity(Message::::default_length()) } @@ -55,8 +55,8 @@ impl ExchangeBehavior for DefaultExchangeBehavior { } } -/// Default exchange type -pub type Exchange = ExchangePusherGeneric; +/// Default exchange type is to fully allocate exchange buffers +pub type Exchange = ExchangePusherGeneric; impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. diff --git a/timely/src/dataflow/channels/pushers/lazy_exchange.rs b/timely/src/dataflow/channels/pushers/lazy_exchange.rs deleted file mode 100644 index 080e920566..0000000000 --- a/timely/src/dataflow/channels/pushers/lazy_exchange.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! The exchange pattern distributes pushed data between many target pushees. - -use std::marker::PhantomData; - -use crate::ExchangeData; -use crate::communication::{Pull, Push}; -use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; -use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; -use crate::dataflow::channels::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; - -/// Distributes records among target pushees according to a distribution function. -/// -/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but -/// tries to leave less allocations around. It does not preallocate a buffer for each pushee, but -/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and -/// only what it is passed back is retained. -pub struct LazyExchangeBehavior {} - -impl ExchangeBehavior for LazyExchangeBehavior { - fn allocate() -> Vec { - Vec::new() - } - - fn check(buffer: &mut Vec) { - if buffer.capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - buffer.capacity(); - buffer.reserve(to_reserve); - } - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Message::push_at_no_allocation(buffer, time, pusher); - } - - fn finalize(_buffer: &mut Vec) { - // None - } -} - -/// Lazy exchange pusher definition -pub type LazyExchangePusher = ExchangePusherGeneric; - -/// An exchange between multiple observers by data, backed by [LazyExchangePusher]. -pub struct LazyExchange { hash_func: F, phantom: PhantomData } - -implu64+'static> LazyExchange { - /// Allocates a new `LeanExchange` pact from a distribution function. - pub fn new(func: F) -> Self { - Self { - hash_func: func, - phantom: PhantomData, - } - } -} - -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for LazyExchange { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(LazyExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) - } -} - -impl std::fmt::Debug for LazyExchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LazyExchange").finish() - } -} diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 16b08e65f8..496a13b672 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,8 +3,7 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; -pub mod eager_exchange; +pub mod non_retaining_exchange; pub mod exchange; -pub mod lazy_exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/channels/pushers/eager_exchange.rs b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs similarity index 72% rename from timely/src/dataflow/channels/pushers/eager_exchange.rs rename to timely/src/dataflow/channels/pushers/non_retaining_exchange.rs index f178d6158d..b53e12f065 100644 --- a/timely/src/dataflow/channels/pushers/eager_exchange.rs +++ b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs @@ -16,9 +16,9 @@ use crate::worker::AsWorker; /// leaves no allocations around. It does not preallocate a buffer for each pushee, but /// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and /// any returned allocation will be dropped. -pub struct EagerExchangeBehavior {} +pub struct NonRetainingExchangeBehavior {} -impl ExchangeBehavior for EagerExchangeBehavior { +impl ExchangeBehavior for NonRetainingExchangeBehavior { fn allocate() -> Vec { Vec::new() } @@ -39,14 +39,14 @@ impl ExchangeBehavior for EagerExchangeBehavior { } } -/// Eager exchange pusher definition -pub type EagerExchangePusher = ExchangePusherGeneric; +/// Non-retaining exchange pusher definition +pub type NonRetainingExchangePusher = ExchangePusherGeneric; -/// An exchange between multiple observers by data, backed by [EagerExchangePusher]. -pub struct EagerExchange { hash_func: F, phantom: PhantomData } +/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher]. +pub struct NonRetainingExchange { hash_func: F, phantom: PhantomData } -implu64+'static> EagerExchange { - /// Allocates a new `LazyExchange` pact from a distribution function. +implu64+'static> NonRetainingExchange { + /// Allocates a new exchange pact from a distribution function. pub fn new(func: F) -> Self { Self { hash_func: func, @@ -56,7 +56,7 @@ implu64+'static> EagerExchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for EagerExchange { +implu64+'static> ParallelizationContract for NonRetainingExchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; @@ -64,12 +64,12 @@ implu64+'static> Paralleliza fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(EagerExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + (Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) } } -impl std::fmt::Debug for EagerExchange { +impl std::fmt::Debug for NonRetainingExchange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("EagerExchange").finish() + f.debug_struct("NonRetainingExchange").finish() } } diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 9880c474b5..1f6438b67e 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -29,11 +29,11 @@ pub trait Exchange { /// # Examples /// ``` /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; - /// use timely::dataflow::channels::pact::LazyExchange; + /// use timely::dataflow::channels::pact::NonRetainingExchange; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) - /// .apply_pact(LazyExchange::new(|x| *x)) + /// .apply_pact(NonRetainingExchange::new(|x| *x)) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` From 915778aba88fa5eca1a667adeefad09dc74f54e5 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 Oct 2021 13:15:26 +0200 Subject: [PATCH 09/11] Promote non-preallocating exchange to default I did some measurements comparing the * current default, * a non-preallocating exchange * id. plus eagerly releasing exchange. The benchmark in the chain of commits does not report a significant change in performance for all variants. Erring on the side of caution, I'd propse to make the second variant, i.e. not pre-allocating, but retaining if push returned a buffer, the default exchange pact. Currently, all non-zerocopy allocators will not return an allocation on push, so in this case, the second and third variant are equivalent. # Why is there no change in performance? For single-recrod exchanges, this can be reasoned as follows. In the old approach, we'd have an allocated buffer, where we write a single element. On flush, the buffer would be sent to the pusher, which might or might not return a buffer. If it doesn't, we'd allocate a new one. In the new approach, we don't have a buffer to append to, so we'd first allocate one. On flush, we send the buffer downstream and maybe get one back. We don't eagerly allocate a new one if there is no buffer available. Both approaches allocate a new buffer if the pusher doesn't/didn't return one earlier. Only the order of allocations is changed. While in the old approach we send data and allocate, we allocate and send data in the new approach. On top of this, the new approach needs to pay a cost of checking the buffer's capacity for each record on the input, but I'd hope for the optimizer to sort this out. The same argument seems to hold for larger batch sizes. Signed-off-by: Moritz Hoffmann --- experiments/benches/exchange_bench.rs | 43 +---------- timely/src/dataflow/channels/pact.rs | 7 +- .../src/dataflow/channels/pushers/exchange.rs | 73 ++++-------------- timely/src/dataflow/channels/pushers/mod.rs | 1 - .../pushers/non_retaining_exchange.rs | 75 ------------------- timely/src/dataflow/operators/exchange.rs | 24 +----- 6 files changed, 23 insertions(+), 200 deletions(-) delete mode 100644 timely/src/dataflow/channels/pushers/non_retaining_exchange.rs diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 491c36585b..981878ee07 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -6,9 +6,6 @@ use std::time::{Duration, Instant}; use criterion::black_box; use criterion::*; -use timely::dataflow::channels::pact::{ - Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract, -}; use timely::dataflow::operators::{Exchange, Input, Probe}; use timely::dataflow::InputHandle; use timely::{CommunicationConfig, Config, WorkerConfig}; @@ -43,22 +40,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - || ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetaining", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config::process(params.threads); - black_box(experiment_exchange( - config, - params.batch, - iters, - || NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -76,25 +57,6 @@ fn bench(c: &mut Criterion) { config, params.batch, iters, - ||ExchangePact::new(|&x| x as u64), - )) - }) - }, - ); - group.bench_with_input( - BenchmarkId::new("NonRetainingZero", params.clone()), - ¶ms, - move |b, params| { - b.iter_custom(|iters| { - let config = Config { - communication: CommunicationConfig::ProcessBinary(params.threads), - worker: WorkerConfig::default(), - }; - black_box(experiment_exchange( - config, - params.batch, - iters, - ||NonRetainingExchange::new(|&x| x as u64), )) }) }, @@ -103,15 +65,14 @@ fn bench(c: &mut Criterion) { } } -fn experiment_exchange P + Sync + Send + 'static, P: ParallelizationContract>( +fn experiment_exchange( config: Config, batch: u64, rounds: u64, - pact: F, ) -> Duration { timely::execute(config, move |worker| { let mut input = InputHandle::new(); - let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe()); + let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe()); let mut time = 0; let timer = Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 000f72b48e..43b5e42792 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -9,8 +9,7 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; -use crate::ExchangeData; -use crate::communication::{Push, Pull}; +use crate::communication::{Push, Pull, Data}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::worker::AsWorker; @@ -19,8 +18,6 @@ use super::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -pub use super::pushers::non_retaining_exchange::NonRetainingExchange; - /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. @@ -61,7 +58,7 @@ implu64+'static> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContract for Exchange { // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. type Pusher = Box>>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index bc4aa4fa8d..fad8340c60 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,89 +1,43 @@ //! The exchange pattern distributes pushed data between many target pushees. -use std::marker::PhantomData; - use crate::Data; use crate::communication::Push; use crate::dataflow::channels::{Bundle, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct ExchangePusherGeneric>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior> { +pub struct Exchange>, H: FnMut(&T, &D) -> u64> { pushers: Vec

, buffers: Vec>, current: Option, hash_func: H, - _phantom_data: PhantomData, -} - -/// The behavior of an exchange specialization -/// -/// This trait gives specialized exchange implementations the opportunity to hook into interesting -/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting -/// from creation, ensuring allocations, flushing and finalizing. -pub trait ExchangeBehavior { - /// Allocate a new buffer, called while creating the exchange pusher. - fn allocate() -> Vec; - /// Check the buffer's capacity before pushing a single element. - fn check(buffer: &mut Vec); - /// Flush a buffer's contents, either when the buffer is at capacity or when no more data is - /// available. - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P); - /// Finalize a buffer after pushing `None`, i.e. no more data is available. - fn finalize(buffer: &mut Vec); -} - -/// Exchange behavior that always has push buffers fully allocated. -pub struct FullyAllocatedExchangeBehavior {} - -impl ExchangeBehavior for FullyAllocatedExchangeBehavior { - fn allocate() -> Vec { - Vec::with_capacity(Message::::default_length()) - } - - fn check(_buffer: &mut Vec) { - // Not needed, always allocated - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - // `push_at` ensures an allocation. - Message::push_at(buffer, time, pusher); - } - - fn finalize(_buffer: &mut Vec) { - // retain any allocation - } } -/// Default exchange type is to fully allocate exchange buffers -pub type Exchange = ExchangePusherGeneric; - -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> ExchangePusherGeneric { - /// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Exchange { + /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(B::allocate()); + buffers.push(Vec::new()); } - ExchangePusherGeneric { + Exchange { pushers, hash_func: key, buffers, current: None, - _phantom_data: PhantomData, } } #[inline] fn flush(&mut self, index: usize) { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); } } } } -impl>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior> Push> for ExchangePusherGeneric { +impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { #[inline(never)] fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange @@ -109,7 +63,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha let mask = (self.pushers.len() - 1) as u64; for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) & mask) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -128,7 +85,10 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha else { for datum in data.drain(..) { let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize; - B::check(&mut self.buffers[index]); + if self.buffers[index].capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffers[index].capacity(); + self.buffers[index].reserve(to_reserve); + } self.buffers[index].push(datum); if self.buffers[index].len() == self.buffers[index].capacity() { self.flush(index); @@ -142,7 +102,6 @@ impl>, H: FnMut(&T, &D)->u64, B: Excha for index in 0..self.pushers.len() { self.flush(index); self.pushers[index].push(&mut None); - B::finalize(&mut self.buffers[index]); } } } diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 496a13b672..295d033cab 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -3,7 +3,6 @@ pub use self::exchange::Exchange; pub use self::counter::Counter; pub mod tee; -pub mod non_retaining_exchange; pub mod exchange; pub mod counter; pub mod buffer; diff --git a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs b/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs deleted file mode 100644 index b53e12f065..0000000000 --- a/timely/src/dataflow/channels/pushers/non_retaining_exchange.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! The exchange pattern distributes pushed data between many target pushees. - -use std::marker::PhantomData; - -use crate::ExchangeData; -use crate::communication::{Pull, Push}; -use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller}; -use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric}; -use crate::dataflow::channels::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; - -/// Distributes records among target pushees according to a distribution function. -/// -/// This implementation behaves similarly to [crate::dataflow::channels::pushers::Exchange], but -/// leaves no allocations around. It does not preallocate a buffer for each pushee, but -/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and -/// any returned allocation will be dropped. -pub struct NonRetainingExchangeBehavior {} - -impl ExchangeBehavior for NonRetainingExchangeBehavior { - fn allocate() -> Vec { - Vec::new() - } - - fn check(buffer: &mut Vec) { - if buffer.capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - buffer.capacity(); - buffer.reserve(to_reserve); - } - } - - fn flush>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Message::push_at_no_allocation(buffer, time, pusher); - } - - fn finalize(buffer: &mut Vec) { - *buffer = Vec::new(); - } -} - -/// Non-retaining exchange pusher definition -pub type NonRetainingExchangePusher = ExchangePusherGeneric; - -/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher]. -pub struct NonRetainingExchange { hash_func: F, phantom: PhantomData } - -implu64+'static> NonRetainingExchange { - /// Allocates a new exchange pact from a distribution function. - pub fn new(func: F) -> Self { - Self { - hash_func: func, - phantom: PhantomData, - } - } -} - -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for NonRetainingExchange { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) - } -} - -impl std::fmt::Debug for NonRetainingExchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NonRetainingExchange").finish() - } -} diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index 1f6438b67e..c106ce37f1 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -1,7 +1,7 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::dataflow::channels::pact::{Exchange as ExchangePact, ParallelizationContract}; +use crate::dataflow::channels::pact::Exchange as ExchangePact; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; @@ -23,31 +23,13 @@ pub trait Exchange { /// }); /// ``` fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Self; - - /// Apply a parallelization contract on the data in a stream - /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::{ToStream, Exchange, Inspect}; - /// use timely::dataflow::channels::pact::NonRetainingExchange; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .apply_pact(NonRetainingExchange::new(|x| *x)) - /// .inspect(|x| println!("seen: {:?}", x)); - /// }); - /// ``` - fn apply_pact>(&self, pact: P) -> Self where T: 'static; } +// impl, D: ExchangeData> Exchange for Stream { impl Exchange for Stream { fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream { - self.apply_pact(ExchangePact::new(route)) - } - - fn apply_pact>(&self, pact: P) -> Stream { let mut vector = Vec::new(); - self.unary(pact, "Exchange", move |_,_| move |input, output| { + self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); output.session(&time).give_vec(&mut vector); From 090e5e0301e5d1dd7979938f573ceff8c08e15ac Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 19 Nov 2021 20:12:24 -0500 Subject: [PATCH 10/11] exchange: Improve comments Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/mod.rs | 4 ++-- timely/src/dataflow/channels/pushers/exchange.rs | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 352adfff38..7d174f0839 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -54,7 +54,7 @@ impl Message { Self::push_at_no_allocation(buffer, time, pusher); - // TODO: Unclear we always want this here. + // Allocate a default buffer to avoid oddly sized or empty buffers if buffer.capacity() != Self::default_length() { *buffer = Vec::with_capacity(Self::default_length()); } @@ -67,7 +67,7 @@ impl Message { #[inline] pub fn push_at_no_allocation>>(buffer: &mut Vec, time: T, pusher: &mut P) { - let data = ::std::mem::replace(buffer, Vec::new()); + let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); let mut bundle = Some(Bundle::from_typed(message)); diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index fad8340c60..21c781cffd 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -63,6 +63,9 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length() { let to_reserve = Message::::default_length() - self.buffers[index].capacity(); self.buffers[index].reserve(to_reserve); @@ -85,6 +88,8 @@ impl>, H: FnMut(&T, &D)->u64> Push::default_length() { let to_reserve = Message::::default_length() - self.buffers[index].capacity(); self.buffers[index].reserve(to_reserve); From cb02537259c849cd525c86053c51d6bb4408b727 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 19 Nov 2021 20:37:53 -0500 Subject: [PATCH 11/11] exchange_bench: Input data as batch Signed-off-by: Moritz Hoffmann --- experiments/benches/exchange_bench.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index 981878ee07..cf9337ca41 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -77,10 +77,13 @@ fn experiment_exchange( let mut time = 0; let timer = Instant::now(); + let buffer = (0..batch).collect(); + let mut copy = Vec::new(); + for _round in 0..rounds { - for i in 0..batch { - input.send(i); - } + copy.clone_from(&buffer); + input.send_batch(&mut copy); + copy.clear(); time += 1; input.advance_to(time); while probe.less_than(input.time()) {