Skip to content

Commit

Permalink
pact: Remove LazyExhange, rename eager to NonRetainingExchange
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Oct 29, 2021
1 parent 8a9d7e0 commit 3477d4f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 197 deletions.
135 changes: 36 additions & 99 deletions experiments/benches/exchange_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::time::{Duration, Instant};
use criterion::black_box;
use criterion::*;

use timely::dataflow::channels::pact::{EagerExchange, LazyExchange};
use timely::dataflow::channels::pact::{
Exchange as ExchangePact, NonRetainingExchange, ParallelizationContract,
};
use timely::dataflow::operators::{Exchange, Input, Probe};
use timely::dataflow::InputHandle;
use timely::{CommunicationConfig, Config, WorkerConfig};
Expand Down Expand Up @@ -37,27 +39,27 @@ fn bench(c: &mut Criterion) {
move |b, params| {
b.iter_custom(|iters| {
let config = Config::process(params.threads);
black_box(experiment_exchange(config, params.batch, iters))
black_box(experiment_exchange(
config,
params.batch,
iters,
|| ExchangePact::new(|&x| x as u64),
))
})
},
);
group.bench_with_input(
BenchmarkId::new("Lazy", params.clone()),
BenchmarkId::new("NonRetaining", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config::process(params.threads);
black_box(experiment_lazy_exchange(config, params.batch, iters))
})
},
);
group.bench_with_input(
BenchmarkId::new("Eager", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config::process(params.threads);
black_box(experiment_eager_exchange(config, params.batch, iters))
black_box(experiment_exchange(
config,
params.batch,
iters,
|| NonRetainingExchange::new(|&x| x as u64),
))
})
},
);
Expand All @@ -70,111 +72,46 @@ fn bench(c: &mut Criterion) {
communication: CommunicationConfig::ProcessBinary(params.threads),
worker: WorkerConfig::default(),
};
black_box(experiment_exchange(config, params.batch, iters))
})
},
);
group.bench_with_input(
BenchmarkId::new("LazyZero", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config {
communication: CommunicationConfig::ProcessBinary(params.threads),
worker: WorkerConfig::default(),
};
black_box(experiment_lazy_exchange(config, params.batch, iters))
black_box(experiment_exchange(
config,
params.batch,
iters,
||ExchangePact::new(|&x| x as u64),
))
})
},
);
group.bench_with_input(
BenchmarkId::new("EagerZero", params.clone()),
BenchmarkId::new("NonRetainingZero", params.clone()),
&params,
move |b, params| {
b.iter_custom(|iters| {
let config = Config {
communication: CommunicationConfig::ProcessBinary(params.threads),
worker: WorkerConfig::default(),
};
black_box(experiment_eager_exchange(config, params.batch, iters))
black_box(experiment_exchange(
config,
params.batch,
iters,
||NonRetainingExchange::new(|&x| x as u64),
))
})
},
);
}
}
}

fn experiment_exchange(config: Config, batch: u64, rounds: u64) -> Duration {
timely::execute(config, move |worker| {
let mut input = InputHandle::new();
let probe =
worker.dataflow(|scope| scope.input_from(&mut input).exchange(|&x| x as u64).probe());

let mut time = 0;
let timer = Instant::now();

for _round in 0..rounds {
for i in 0..batch {
input.send(i);
}
time += 1;
input.advance_to(time);
while probe.less_than(input.time()) {
worker.step();
}
}
timer.elapsed()
})
.unwrap()
.join()
.into_iter()
.next()
.unwrap()
.unwrap()
}

fn experiment_eager_exchange(config: Config, batch: u64, rounds: u64) -> Duration {
timely::execute(config, move |worker| {
let mut input = InputHandle::new();
let probe = worker.dataflow(|scope| {
scope
.input_from(&mut input)
.apply_pact(EagerExchange::new(|&x| x as u64))
.probe()
});

let mut time = 0;
let timer = Instant::now();

for _round in 0..rounds {
for i in 0..batch {
input.send(i);
}
time += 1;
input.advance_to(time);
while probe.less_than(input.time()) {
worker.step();
}
}
timer.elapsed()
})
.unwrap()
.join()
.into_iter()
.next()
.unwrap()
.unwrap()
}

fn experiment_lazy_exchange(config: Config, batch: u64, rounds: u64) -> Duration {
fn experiment_exchange<F: Fn() -> P + Sync + Send + 'static, P: ParallelizationContract<i32, u64>>(
config: Config,
batch: u64,
rounds: u64,
pact: F,
) -> Duration {
timely::execute(config, move |worker| {
let mut input = InputHandle::new();
let probe = worker.dataflow(|scope| {
scope
.input_from(&mut input)
.apply_pact(LazyExchange::new(|&x| x as u64))
.probe()
});
let probe = worker.dataflow(|scope| scope.input_from(&mut input).apply_pact(pact()).probe());

let mut time = 0;
let timer = Instant::now();
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use super::{Bundle, Message};

use crate::logging::{TimelyLogger as Logger, MessagesEvent};

pub use super::pushers::lazy_exchange::LazyExchange;
pub use super::pushers::eager_exchange::EagerExchange;
pub use super::pushers::non_retaining_exchange::NonRetainingExchange;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T: 'static, D: 'static> {
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ pub trait ExchangeBehavior<T, D> {
fn finalize(buffer: &mut Vec<D>);
}

/// Default exchange behavior
pub struct DefaultExchangeBehavior {}
/// Exchange behavior that always has push buffers fully allocated.
pub struct FullyAllocatedExchangeBehavior {}

impl<T, D> ExchangeBehavior<T, D> for DefaultExchangeBehavior {
impl<T, D> ExchangeBehavior<T, D> for FullyAllocatedExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::with_capacity(Message::<T, D>::default_length())
}
Expand All @@ -55,8 +55,8 @@ impl<T, D> ExchangeBehavior<T, D> for DefaultExchangeBehavior {
}
}

/// Default exchange type
pub type Exchange<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, DefaultExchangeBehavior>;
/// Default exchange type is to fully allocate exchange buffers
pub type Exchange<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, FullyAllocatedExchangeBehavior>;

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior<T, D>> ExchangePusherGeneric<T, D, P, H, B> {
/// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function.
Expand Down
75 changes: 0 additions & 75 deletions timely/src/dataflow/channels/pushers/lazy_exchange.rs

This file was deleted.

3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ pub use self::exchange::Exchange;
pub use self::counter::Counter;

pub mod tee;
pub mod eager_exchange;
pub mod non_retaining_exchange;
pub mod exchange;
pub mod lazy_exchange;
pub mod counter;
pub mod buffer;
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use crate::worker::AsWorker;
/// leaves no allocations around. It does not preallocate a buffer for each pushee, but
/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and
/// any returned allocation will be dropped.
pub struct EagerExchangeBehavior {}
pub struct NonRetainingExchangeBehavior {}

impl<T, D> ExchangeBehavior<T, D> for EagerExchangeBehavior {
impl<T, D> ExchangeBehavior<T, D> for NonRetainingExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::new()
}
Expand All @@ -39,14 +39,14 @@ impl<T, D> ExchangeBehavior<T, D> for EagerExchangeBehavior {
}
}

/// Eager exchange pusher definition
pub type EagerExchangePusher<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, EagerExchangeBehavior>;
/// Non-retaining exchange pusher definition
pub type NonRetainingExchangePusher<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, NonRetainingExchangeBehavior>;

/// An exchange between multiple observers by data, backed by [EagerExchangePusher].
pub struct EagerExchange<D, F> { hash_func: F, phantom: PhantomData<D> }
/// An exchange between multiple observers by data, backed by [NonRetainingExchangePusher].
pub struct NonRetainingExchange<D, F> { hash_func: F, phantom: PhantomData<D> }

impl<D, F: FnMut(&D)->u64+'static> EagerExchange<D, F> {
/// Allocates a new `LazyExchange` pact from a distribution function.
impl<D, F: FnMut(&D)->u64+'static> NonRetainingExchange<D, F> {
/// Allocates a new exchange pact from a distribution function.
pub fn new(func: F) -> Self {
Self {
hash_func: func,
Expand All @@ -56,20 +56,20 @@ impl<D, F: FnMut(&D)->u64+'static> EagerExchange<D, F> {
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Eq+ExchangeData, D: ExchangeData, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for EagerExchange<D, F> {
impl<T: Eq+ExchangeData, D: ExchangeData, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for NonRetainingExchange<D, F> {
// TODO: The closure in the type prevents us from naming it.
// Could specialize `ExchangePusher` to a time-free version.
type Pusher = Box<dyn Push<Bundle<T, D>>>;
type Puller = Box<dyn Pull<Bundle<T, D>>>;
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, D>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(Box::new(EagerExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
(Box::new(NonRetainingExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
}
}

impl<D, F> std::fmt::Debug for EagerExchange<D, F> {
impl<D, F> std::fmt::Debug for NonRetainingExchange<D, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EagerExchange").finish()
f.debug_struct("NonRetainingExchange").finish()
}
}
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ pub trait Exchange<T, D: ExchangeData> {
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Exchange, Inspect};
/// use timely::dataflow::channels::pact::LazyExchange;
/// use timely::dataflow::channels::pact::NonRetainingExchange;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .apply_pact(LazyExchange::new(|x| *x))
/// .apply_pact(NonRetainingExchange::new(|x| *x))
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
Expand Down

0 comments on commit 3477d4f

Please sign in to comment.