Skip to content

Commit

Permalink
Promote non-preallocating exchange to default
Browse files Browse the repository at this point in the history
I did some measurements comparing the
* current default,
* a non-preallocating exchange
* id. plus eagerly releasing exchange.

The benchmark in the chain of commits does not report a significant change
in performance for all variants. Erring on the side of caution, I'd propse
to make the second variant, i.e. not pre-allocating, but retaining if push
returned a buffer, the default exchange pact.

Currently, all non-zerocopy allocators will not return an allocation on
push, so in this case, the second and third variant are equivalent.

# Why is there no change in performance?

For single-recrod exchanges, this can be reasoned as follows. In the old
approach, we'd have an allocated buffer, where we write a single element.
On flush, the buffer would be sent to the pusher, which might or might not
return a buffer. If it doesn't, we'd allocate a new one.

In the new approach, we don't have a buffer to append to, so we'd first
allocate one. On flush, we send the buffer downstream and maybe get one
back. We don't eagerly allocate a new one if there is no buffer available.

Both approaches allocate a new buffer if the pusher doesn't/didn't return
one earlier. Only the order of allocations is changed. While in the old
approach we send data and allocate, we allocate and send data in the new
approach. On top of this, the new approach needs to pay a cost of checking
the buffer's capacity for each record on the input, but I'd hope for the
optimizer to sort this out.

The same argument seems to hold for larger batch sizes.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Oct 29, 2021
1 parent 3477d4f commit e04a831
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 200 deletions.
43 changes: 2 additions & 41 deletions experiments/benches/exchange_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::time::{Duration, Instant};
use criterion::black_box;
use criterion::*;

use timely::dataflow::channels::pact::{
Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract,
};
use timely::dataflow::operators::{Exchange, Input, Probe};
use timely::dataflow::InputHandle;
use timely::{CommunicationConfig, Config, WorkerConfig};
Expand Down Expand Up @@ -43,22 +40,6 @@ fn bench(c: &mut Criterion) {
config,
params.batch,
iters,
|| ExchangePact::new(|&x| x as u64),
))
})
},
);
group.bench_with_input(
BenchmarkId::new("NonRetaining", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config::process(params.threads);
black_box(experiment_exchange(
config,
params.batch,
iters,
|| NonRetainingExchange::new(|&x| x as u64),
))
})
},
Expand All @@ -76,25 +57,6 @@ fn bench(c: &mut Criterion) {
config,
params.batch,
iters,
||ExchangePact::new(|&x| x as u64),
))
})
},
);
group.bench_with_input(
BenchmarkId::new("NonRetainingZero", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config {
communication: CommunicationConfig::ProcessBinary(params.threads),
worker: WorkerConfig::default(),
};
black_box(experiment_exchange(
config,
params.batch,
iters,
||NonRetainingExchange::new(|&x| x as u64),
))
})
},
Expand All @@ -103,15 +65,14 @@ fn bench(c: &mut Criterion) {
}
}

fn experiment_exchange<F: Fn() -> P + Sync + Send + 'static, P: ParallelizationContract<i32, u64>>(
fn experiment_exchange(
config: Config,
batch: u64,
rounds: u64,
pact: F,
) -> Duration {
timely::execute(config, move |worker| {
let mut input = InputHandle::new();
let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe());
let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe());

let mut time = 0;
let timer = Instant::now();
Expand Down
7 changes: 2 additions & 5 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
use std::{fmt::{self, Debug}, marker::PhantomData};

use crate::ExchangeData;
use crate::communication::{Push, Pull};
use crate::communication::{Push, Pull, Data};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};

use crate::worker::AsWorker;
Expand All @@ -19,8 +18,6 @@ use super::{Bundle, Message};

use crate::logging::{TimelyLogger as Logger, MessagesEvent};

pub use super::pushers::non_retaining_exchange::NonRetainingExchange;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T: 'static, D: 'static> {
/// Type implementing `Push` produced by this pact.
Expand Down Expand Up @@ -61,7 +58,7 @@ impl<D, F: FnMut(&D)->u64+'static> Exchange<D, F> {
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Eq+ExchangeData, D: ExchangeData, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
// TODO: The closure in the type prevents us from naming it.
// Could specialize `ExchangePusher` to a time-free version.
type Pusher = Box<dyn Push<Bundle<T, D>>>;
Expand Down
73 changes: 16 additions & 57 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,43 @@
//! The exchange pattern distributes pushed data between many target pushees.
use std::marker::PhantomData;

use crate::Data;
use crate::communication::Push;
use crate::dataflow::channels::{Bundle, Message};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct ExchangePusherGeneric<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior<T, D>> {
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
_phantom_data: PhantomData<B>,
}

/// The behavior of an exchange specialization
///
/// This trait gives specialized exchange implementations the opportunity to hook into interesting
/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting
/// from creation, ensuring allocations, flushing and finalizing.
pub trait ExchangeBehavior<T, D> {
/// Allocate a new buffer, called while creating the exchange pusher.
fn allocate() -> Vec<D>;
/// Check the buffer's capacity before pushing a single element.
fn check(buffer: &mut Vec<D>);
/// Flush a buffer's contents, either when the buffer is at capacity or when no more data is
/// available.
fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P);
/// Finalize a buffer after pushing `None`, i.e. no more data is available.
fn finalize(buffer: &mut Vec<D>);
}

/// Exchange behavior that always has push buffers fully allocated.
pub struct FullyAllocatedExchangeBehavior {}

impl<T, D> ExchangeBehavior<T, D> for FullyAllocatedExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::with_capacity(Message::<T, D>::default_length())
}

fn check(_buffer: &mut Vec<D>) {
// Not needed, always allocated
}

fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
// `push_at` ensures an allocation.
Message::push_at(buffer, time, pusher);
}

fn finalize(_buffer: &mut Vec<D>) {
// retain any allocation
}
}

/// Default exchange type is to fully allocate exchange buffers
pub type Exchange<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, FullyAllocatedExchangeBehavior>;

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior<T, D>> ExchangePusherGeneric<T, D, P, H, B> {
/// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> ExchangePusherGeneric<T, D, P, H, B> {
impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
for _ in 0..pushers.len() {
buffers.push(B::allocate());
buffers.push(Vec::new());
}
ExchangePusherGeneric {
Exchange {
pushers,
hash_func: key,
buffers,
current: None,
_phantom_data: PhantomData,
}
}
#[inline]
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}
}
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior<T, D>> Push<Bundle<T, D>> for ExchangePusherGeneric<T, D, P, H, B> {
impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
#[inline(never)]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange
Expand All @@ -109,7 +63,10 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: Excha
let mask = (self.pushers.len() - 1) as u64;
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;
B::check(&mut self.buffers[index]);
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -128,7 +85,10 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: Excha
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
B::check(&mut self.buffers[index]);
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -142,7 +102,6 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: Excha
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
B::finalize(&mut self.buffers[index]);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub use self::exchange::Exchange;
pub use self::counter::Counter;

pub mod tee;
pub mod non_retaining_exchange;
pub mod exchange;
pub mod counter;
pub mod buffer;
75 changes: 0 additions & 75 deletions timely/src/dataflow/channels/pushers/non_retaining_exchange.rs

This file was deleted.

24 changes: 3 additions & 21 deletions timely/src/dataflow/operators/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Exchange records between workers.
use crate::ExchangeData;
use crate::dataflow::channels::pact::{Exchange as ExchangePact, ParallelizationContract};
use crate::dataflow::channels::pact::Exchange as ExchangePact;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;

Expand All @@ -23,31 +23,13 @@ pub trait Exchange<T, D: ExchangeData> {
/// });
/// ```
fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Self;

/// Apply a parallelization contract on the data in a stream
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Exchange, Inspect};
/// use timely::dataflow::channels::pact::NonRetainingExchange;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .apply_pact(NonRetainingExchange::new(|x| *x))
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn apply_pact<P: ParallelizationContract<T, D>>(&self, pact: P) -> Self where T: 'static;
}

// impl<T: Timestamp, G: Scope<Timestamp=T>, D: ExchangeData> Exchange<T, D> for Stream<G, D> {
impl<G: Scope, D: ExchangeData> Exchange<G::Timestamp, D> for Stream<G, D> {
fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream<G, D> {
self.apply_pact(ExchangePact::new(route))
}

fn apply_pact<P: ParallelizationContract<G::Timestamp, D>>(&self, pact: P) -> Stream<G, D> {
let mut vector = Vec::new();
self.unary(pact, "Exchange", move |_,_| move |input, output| {
self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
Expand Down

0 comments on commit e04a831

Please sign in to comment.