Skip to content

Commit

Permalink
exchange: Unify all exchange variants to generic impl
Browse files Browse the repository at this point in the history
Instead of defining distinct exchange pushers, make the default generic
and provide necessary hooks for specialized exchange pushers.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Oct 7, 2021
1 parent ff05574 commit 8717d6f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 183 deletions.
100 changes: 20 additions & 80 deletions timely/src/dataflow/channels/pushers/eager_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use std::marker::PhantomData;

use crate::{Data, ExchangeData};
use crate::ExchangeData;
use crate::communication::{Pull, Push};
use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller};
use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric};
use crate::dataflow::channels::{Bundle, Message};
use crate::logging::TimelyLogger as Logger;
use crate::worker::AsWorker;
Expand All @@ -15,98 +16,37 @@ use crate::worker::AsWorker;
/// leaves no allocations around. It does not preallocate a buffer for each pushee, but
/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and
/// any returned allocation will be dropped.
pub struct EagerExchangePusher<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
}
pub struct EagerExchangeBehavior {}

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> EagerExchangePusher<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> EagerExchangePusher<T, D, P, H> {
let buffers = (0..pushers.len()).map(|_| vec![]).collect();
EagerExchangePusher {
pushers,
hash_func: key,
buffers,
current: None,
}
impl<T, D> ExchangeBehavior<T, D> for EagerExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::new()
}
#[inline]
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}

fn check(buffer: &mut Vec<D>) {
if buffer.capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - buffer.capacity();
buffer.reserve(to_reserve);
}
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for EagerExchangePusher<T, D, P, H> {
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange
if self.pushers.len() == 1 {
self.pushers[0].push(message);
} else if let Some(message) = message {
let message = message.as_mut();
let time = &message.time;
let data = &mut message.data;

// if the time isn't right, flush everything.
if self.current.as_ref().map_or(false, |x| x != time) {
for index in 0..self.pushers.len() {
self.flush(index);
}
}
self.current = Some(time.clone());
fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
Message::push_at_no_allocation(buffer, time, pusher);
}

// if the number of pushers is a power of two, use a mask
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
let mask = (self.pushers.len() - 1) as u64;
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
// We have reached the buffer's capacity
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
}
}
} else {
// as a last resort, use mod (%)
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
}
self.buffers[index].push(datum);
// We have reached the buffer's capacity
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
}
}
}
} else {
// flush
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
self.buffers[index] = Vec::new();
}
}
fn finalize(buffer: &mut Vec<D>) {
*buffer = Vec::new();
}
}

/// Eager exchange pusher definition
pub type EagerExchangePusher<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, EagerExchangeBehavior>;

/// An exchange between multiple observers by data, backed by [EagerExchangePusher].
pub struct EagerExchange<D, F> { hash_func: F, phantom: PhantomData<D> }

impl<D, F: FnMut(&D)->u64+'static> EagerExchange<D, F> {
/// Allocates a new `LeanExchange` pact from a distribution function.
/// Allocates a new `LazyExchange` pact from a distribution function.
pub fn new(func: F) -> Self {
Self {
hash_func: func,
Expand Down
66 changes: 57 additions & 9 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,89 @@
//! The exchange pattern distributes pushed data between many target pushees.
use std::marker::PhantomData;

use crate::Data;
use crate::communication::Push;
use crate::dataflow::channels::{Bundle, Message};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pub struct ExchangePusherGeneric<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64, B: ExchangeBehavior<T, D>> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
_phantom_data: PhantomData<B>,
}

/// The behavior of an exchange specialization
///
/// This trait gives specialized exchange implementations the opportunity to hook into interesting
/// places for memory management. It exposes the lifecycle of each of the pushee's buffers, starting
/// from creation, ensuring allocations, flushing and finalizing.
pub trait ExchangeBehavior<T, D> {
/// Allocate a new buffer, called while creating the exchange pusher.
fn allocate() -> Vec<D>;
/// Check the buffer's capacity before pushing a single element.
fn check(buffer: &mut Vec<D>);
/// Flush a buffer's contents, either when the buffer is at capacity or when no more data is
/// available.
fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P);
/// Finalize a buffer after pushing `None`, i.e. no more data is available.
fn finalize(buffer: &mut Vec<D>);
}

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
/// Default exchange behavior
pub struct DefaultExchangeBehavior {}

impl<T, D> ExchangeBehavior<T, D> for DefaultExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::with_capacity(Message::<T, D>::default_length())
}

fn check(_buffer: &mut Vec<D>) {
// Not needed, always allocated
}

fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
// `push_at` ensures an allocation.
Message::push_at(buffer, time, pusher);
}

fn finalize(_buffer: &mut Vec<D>) {
// retain any allocation
}
}

/// Default exchange type
pub type Exchange<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, DefaultExchangeBehavior>;

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior<T, D>> ExchangePusherGeneric<T, D, P, H, B> {
/// Allocates a new `ExchangeGeneric` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> ExchangePusherGeneric<T, D, P, H, B> {
let mut buffers = vec![];
for _ in 0..pushers.len() {
buffers.push(Vec::with_capacity(Message::<T, D>::default_length()));
buffers.push(B::allocate());
}
Exchange {
ExchangePusherGeneric {
pushers,
hash_func: key,
buffers,
current: None,
_phantom_data: PhantomData,
}
}
#[inline]
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
B::flush(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}
}
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64, B: ExchangeBehavior<T, D>> Push<Bundle<T, D>> for ExchangePusherGeneric<T, D, P, H, B> {
#[inline(never)]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange
Expand All @@ -63,7 +109,7 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
let mask = (self.pushers.len() - 1) as u64;
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;

B::check(&mut self.buffers[index]);
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -82,6 +128,7 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
else {
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
B::check(&mut self.buffers[index]);
self.buffers[index].push(datum);
if self.buffers[index].len() == self.buffers[index].capacity() {
self.flush(index);
Expand All @@ -95,6 +142,7 @@ impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bun
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
B::finalize(&mut self.buffers[index]);
}
}
}
Expand Down
113 changes: 19 additions & 94 deletions timely/src/dataflow/channels/pushers/lazy_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use std::marker::PhantomData;

use crate::{Data, ExchangeData};
use crate::ExchangeData;
use crate::communication::{Pull, Push};
use crate::dataflow::channels::pact::{ParallelizationContract, LogPusher, LogPuller};
use crate::dataflow::channels::pushers::exchange::{ExchangeBehavior, ExchangePusherGeneric};
use crate::dataflow::channels::{Bundle, Message};
use crate::logging::TimelyLogger as Logger;
use crate::worker::AsWorker;
Expand All @@ -15,108 +16,32 @@ use crate::worker::AsWorker;
/// tries to leave less allocations around. It does not preallocate a buffer for each pushee, but
/// only allocates it once data is pushed. On flush, the allocation is passed to the pushee, and
/// only what it is passed back is retained.
pub struct LazyExchangePusher<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
}
pub struct LazyExchangeBehavior {}

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> LazyExchangePusher<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> LazyExchangePusher<T, D, P, H> {
let buffers = (0..pushers.len()).map(|_| vec![]).collect();
LazyExchangePusher {
pushers,
hash_func: key,
buffers,
current: None,
}
impl<T, D> ExchangeBehavior<T, D> for LazyExchangeBehavior {
fn allocate() -> Vec<D> {
Vec::new()
}
#[inline]
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}

fn check(buffer: &mut Vec<D>) {
if buffer.capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - buffer.capacity();
buffer.reserve(to_reserve);
}
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for LazyExchangePusher<T, D, P, H> {
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange
if self.pushers.len() == 1 {
self.pushers[0].push(message);
} else if let Some(message) = message {
let message = message.as_mut();
let time = &message.time;
let data = &mut message.data;

// if the time isn't right, flush everything.
if self.current.as_ref().map_or(false, |x| x != time) {
for index in 0..self.pushers.len() {
self.flush(index);
}
}
self.current = Some(time.clone());
fn flush<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
Message::push_at_no_allocation(buffer, time, pusher);
}

// if the number of pushers is a power of two, use a mask
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
let mask = (self.pushers.len() - 1) as u64;
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) & mask) as usize;
// Push at the target buffer, which might be without capacity, or preallocated
self.buffers[index].push(datum);
// We have reached the buffer's capacity
if self.buffers[index].len() == self.buffers[index].capacity() {
// If the buffer's capacity is below the default length, reallocate to match
// the default length
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
} else {
// Buffer is at capacity, flush
self.flush(index);
// Explicitly allocate a new buffer under the assumption that more data
// will be sent to the pushee.
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers.reserve(to_reserve);
}
}
}
}
} else {
// as a last resort, use mod (%)
for datum in data.drain(..) {
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
self.buffers[index].push(datum);
// This code is duplicated from above, keep in sync!
if self.buffers[index].len() == self.buffers[index].capacity() {
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers[index].reserve(to_reserve);
} else {
self.flush(index);
if self.buffers[index].capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffers[index].capacity();
self.buffers.reserve(to_reserve);
}
}
}
}
}
} else {
// flush
for index in 0..self.pushers.len() {
self.flush(index);
self.pushers[index].push(&mut None);
}
}
fn finalize(_buffer: &mut Vec<D>) {
// None
}
}

/// Lazy exchange pusher definition
pub type LazyExchangePusher<T, D, P, H> = ExchangePusherGeneric<T, D, P, H, LazyExchangeBehavior>;

/// An exchange between multiple observers by data, backed by [LazyExchangePusher].
pub struct LazyExchange<D, F> { hash_func: F, phantom: PhantomData<D> }

Expand Down

0 comments on commit 8717d6f

Please sign in to comment.