Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exchange: bench, do not preallocate buffers #416

Merged
merged 11 commits into from
Nov 20, 2021
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"kafkaesque",
"logging",
"timely",
"experiments"
]

[profile.release]
Expand Down
12 changes: 12 additions & 0 deletions experiments/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "experiments"
version = "0.1.0"
edition = "2018"

[dependencies]
criterion = "0.3.5"
timely = { path = "../timely" }

[[bench]]
name = "exchange_bench"
harness = false
104 changes: 104 additions & 0 deletions experiments/benches/exchange_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
extern crate timely;

use std::fmt::{Display, Formatter};
use std::time::{Duration, Instant};

use criterion::black_box;
use criterion::*;

use timely::dataflow::operators::{Exchange, Input, Probe};
use timely::dataflow::InputHandle;
use timely::{CommunicationConfig, Config, WorkerConfig};

#[derive(Clone)]
struct ExperimentConfig {
threads: usize,
batch: u64,
}

impl Display for ExperimentConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "threads={:2},batch={:5}", self.threads, self.batch)
}
}

fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("exchange");
for threads in [1, 2, 4, 8, 16] {
for shift in [0, 4, 8, 14] {
let params = ExperimentConfig {
threads,
batch: 1u64 << shift,
};
group.bench_with_input(
BenchmarkId::new("Default", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config::process(params.threads);
black_box(experiment_exchange(
config,
params.batch,
iters,
))
})
},
);
group.bench_with_input(
BenchmarkId::new("DefaultZero", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config {
communication: CommunicationConfig::ProcessBinary(params.threads),
worker: WorkerConfig::default(),
};
black_box(experiment_exchange(
config,
params.batch,
iters,
))
})
},
);
}
}
}

fn experiment_exchange(
config: Config,
batch: u64,
rounds: u64,
) -> Duration {
timely::execute(config, move |worker| {
let mut input = InputHandle::new();
let probe = worker.dataflow(|scope| scope.input_from(&mut input).exchange(|x| *x).probe());

let mut time = 0;
let timer = Instant::now();

let buffer = (0..batch).collect();
let mut copy = Vec::new();

for _round in 0..rounds {
copy.clone_from(&buffer);
input.send_batch(&mut copy);
copy.clear();
time += 1;
input.advance_to(time);
while probe.less_than(input.time()) {
worker.step();
}
}
timer.elapsed()
})
.unwrap()
.join()
.into_iter()
.next()
.unwrap()
.unwrap()
}

criterion_group!(benches, bench);
criterion_main!(benches);
28 changes: 23 additions & 5 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,28 @@ impl<T, D> Message<T, D> {
Message { time, data, from, seq }
}

/// Forms a message, and pushes contents at `pusher`.
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or a new `Vec`. Note that the returned vector is always initialized with
/// a capacity of [Self::default_length] elements.
#[inline]
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

let data = ::std::mem::replace(buffer, Vec::new());
Self::push_at_no_allocation(buffer, time, pusher);

// Allocate a default buffer to avoid oddly sized or empty buffers
if buffer.capacity() != Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or a new empty `Vec`. If the pusher leaves a vector with a capacity larger
/// than [Self::default_length], the vector is initialized with a new vector with
/// [Self::default_length] capacity.
#[inline]
pub fn push_at_no_allocation<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(Bundle::from_typed(message));

Expand All @@ -63,8 +80,9 @@ impl<T, D> Message<T, D> {
}
}

// TODO: Unclear we always want this here.
if buffer.capacity() != Self::default_length() {
// Avoid memory leaks by buffers growing out of bounds
if buffer.capacity() > Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
}}
}
}
16 changes: 14 additions & 2 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D,
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
for _ in 0..pushers.len() {
buffers.push(Vec::with_capacity(Message::<T, D>::default_length()));
buffers.push(Vec::new());
}
Exchange {
pushers,
Expand All @@ -31,7 +31,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D,
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}
}
}
Expand Down Expand Up @@ -64,6 +64,12 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;

// Ensure allocated buffers: If the buffer's capacity is less than its default
// capacity, increase the capacity such that it matches the default.
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -82,6 +88,12 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
// Ensure allocated buffers: If the buffer's capacity is less than its default
// capacity, increase the capacity such that it matches the default.
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand Down