Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge batcher input generic over containers #494

Merged
merged 14 commits into from
Jun 13, 2024
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ fn main() {
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
210 changes: 209 additions & 1 deletion src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.

use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
use crate::difference::Semigroup;
use crate::difference::{IsZero, Semigroup};
use crate::trace::cursor::IntoOwned;

/// Sorts and consolidates `vec`.
///
Expand Down Expand Up @@ -218,6 +223,151 @@ where
}
}

/// Layout of containers and their read items to be consolidated.
///
/// This trait specifies behavior to extract keys and diffs from container's read
/// items. Consolidation accumulates the diffs per key.
///
/// The trait requires `Container` to have access to its `Item` GAT.
pub trait ConsolidateLayout: Container {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;

/// GAT diff type.
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;

/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Deconstruct an item into key and diff. Must be cheap.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

/// Push an element to a compatible container.
///
/// This function is odd to have, so let's explain why it exists. Ideally, the container
/// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
/// might never be in a position where this is true: Vectors can push any `T`, which would
/// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
///
/// For this reason, we expose a function here that takes a GAT key and an owned diff, and
/// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);

/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
}

fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}
}

impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
where
for<'a> K: Region + Push<<K as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> K::ReadItem<'a>: Ord + Copy,
for<'a> V: Region + Push<<V as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> V::ReadItem<'a>: Ord + Copy,
for<'a> T: Region + Push<<T as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> T::ReadItem<'a>: Ord + Copy,
R: Region + Push<<R as Region>::Owned> + Clone + 'static,
for<'a> R::Owned: Semigroup<R::ReadItem<'a>>,
{
type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((key, val, time), diff)
}

fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering {
(K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2)))
}

fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.copy(((key, value), time, diff));
}
}

/// Behavior for copying consolidation.
pub trait ConsolidateContainer<C> {
/// Consolidate the contents of `container` and write the result to `target`.
fn consolidate(&mut self, container: &mut C, target: &mut C);
}
antiguru marked this conversation as resolved.
Show resolved Hide resolved

/// Container consolidator that requires the container's item to implement [`ConsolidateLayout`].
#[derive(Default, Debug)]
pub struct ContainerConsolidator;

impl<C> ConsolidateContainer<C> for ContainerConsolidator
where
C: ConsolidateLayout,
{
/// Consolidate the supplied container.
fn consolidate(&mut self, container: &mut C, target: &mut C) {
// Sort input data
let mut permutation = Vec::new();
permutation.extend(container.drain());
permutation.sort_by(|a, b| C::cmp(a, b));

// Consolidate sorted data.
let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None;
frankmcsherry marked this conversation as resolved.
Show resolved Hide resolved
// TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't
// offer a suitable API.
for item in permutation.drain(..) {
let (key, diff) = C::into_parts(item);
match &mut previous {
// Initial iteration, remeber key and diff.
// TODO: Opportunity for GatCow for diff.
None => previous = Some((key, diff.into_owned())),
Some((prevkey, d)) => {
// Second and following iteration, compare and accumulate or emit.
if key == *prevkey {
// Keys match, keep accumulating.
d.plus_equals(&diff);
} else {
// Keys don't match, write down result if non-zero.
if !d.is_zero() {
// Unwrap because we checked for `Some` above.
let (prevkey, diff) = previous.take().unwrap();
target.push_with_diff(prevkey, diff);
}
// Remember current key and diff as `previous`
previous = Some((key, diff.into_owned()));
}
}
}
}
// Write any residual data, if non-zero.
if let Some((previtem, d)) = previous {
if !d.is_zero() {
target.push_with_diff(previtem, d);
}
}
}
}



#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -308,4 +458,62 @@ mod tests {
assert_eq!((i, 0, 1), collected[i]);
}
}

#[test]
fn test_consolidate_container() {
let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
let mut target = Vec::default();
data.sort();
ContainerConsolidator::default().consolidate(&mut data, &mut target);
assert_eq!(target, [(2, 1, 1)]);
}

#[cfg(not(debug_assertions))]
const LEN: usize = 256 << 10;
#[cfg(not(debug_assertions))]
const REPS: usize = 10 << 10;

#[cfg(debug_assertions)]
const LEN: usize = 256 << 1;
#[cfg(debug_assertions)]
const REPS: usize = 10 << 1;

#[test]
fn test_consolidator_duration() {
let mut data = Vec::with_capacity(LEN);
let mut data2 = Vec::with_capacity(LEN);
let mut target = Vec::new();
let mut duration = std::time::Duration::default();
let mut consolidator = ContainerConsolidator::default();
for _ in 0..REPS {
data.clear();
data2.clear();
target.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidator.consolidate(&mut data, &mut target);
duration += start.elapsed();

consolidate_updates(&mut data2);
assert_eq!(target, data2);
}
println!("elapsed consolidator {duration:?}");
}

#[test]
fn test_consolidator_duration_vec() {
let mut data = Vec::with_capacity(LEN);
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_updates(&mut data);
duration += start.elapsed();
}
println!("elapsed vec {duration:?}");
}
}
Loading