Skip to content

Commit

Permalink
Merge batcher input generic over containers (#494)
Browse files Browse the repository at this point in the history
Enable flatcontainers in merge batchers.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jun 13, 2024
1 parent 7c4eec9 commit d0a1cab
Show file tree
Hide file tree
Showing 10 changed files with 1,157 additions and 314 deletions.
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ fn main() {
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
194 changes: 193 additions & 1 deletion src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.
use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
use crate::difference::Semigroup;
use crate::difference::{IsZero, Semigroup};
use crate::trace::cursor::IntoOwned;

/// Sorts and consolidates `vec`.
///
Expand Down Expand Up @@ -218,6 +223,136 @@ where
}
}

/// Layout of containers and their read items to be consolidated.
///
/// This trait specifies behavior to extract keys and diffs from container's read
/// items. Consolidation accumulates the diffs per key.
///
/// The trait requires `Container` to have access to its `Item` GAT.
pub trait ConsolidateLayout: Container {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;

/// GAT diff type.
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;

/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Deconstruct an item into key and diff. Must be cheap.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

/// Push an element to a compatible container.
///
/// This function is odd to have, so let's explain why it exists. Ideally, the container
/// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
/// might never be in a position where this is true: Vectors can push any `T`, which would
/// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
///
/// For this reason, we expose a function here that takes a GAT key and an owned diff, and
/// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);

/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
}

fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}
}

impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
where
for<'a> K: Region + Push<<K as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> K::ReadItem<'a>: Ord + Copy,
for<'a> V: Region + Push<<V as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> V::ReadItem<'a>: Ord + Copy,
for<'a> T: Region + Push<<T as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> T::ReadItem<'a>: Ord + Copy,
R: Region + Push<<R as Region>::Owned> + Clone + 'static,
for<'a> R::Owned: Semigroup<R::ReadItem<'a>>,
{
type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((key, val, time), diff)
}

fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering {
(K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2)))
}

fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.copy(((key, value), time, diff));
}
}

/// Consolidate the supplied container.
pub fn consolidate_container<C: ConsolidateLayout>(container: &mut C, target: &mut C) {
// Sort input data
let mut permutation = Vec::new();
permutation.extend(container.drain());
permutation.sort_by(|a, b| C::cmp(a, b));

// Consolidate sorted data.
let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None;
// TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't
// offer a suitable API.
for item in permutation.drain(..) {
let (key, diff) = C::into_parts(item);
match &mut previous {
// Initial iteration, remeber key and diff.
// TODO: Opportunity for GatCow for diff.
None => previous = Some((key, diff.into_owned())),
Some((prevkey, d)) => {
// Second and following iteration, compare and accumulate or emit.
if key == *prevkey {
// Keys match, keep accumulating.
d.plus_equals(&diff);
} else {
// Keys don't match, write down result if non-zero.
if !d.is_zero() {
// Unwrap because we checked for `Some` above.
let (prevkey, diff) = previous.take().unwrap();
target.push_with_diff(prevkey, diff);
}
// Remember current key and diff as `previous`
previous = Some((key, diff.into_owned()));
}
}
}
}
// Write any residual data, if non-zero.
if let Some((previtem, d)) = previous {
if !d.is_zero() {
target.push_with_diff(previtem, d);
}
}
}



#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -308,4 +443,61 @@ mod tests {
assert_eq!((i, 0, 1), collected[i]);
}
}

#[test]
fn test_consolidate_container() {
let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
let mut target = Vec::default();
data.sort();
consolidate_container(&mut data, &mut target);
assert_eq!(target, [(2, 1, 1)]);
}

#[cfg(not(debug_assertions))]
const LEN: usize = 256 << 10;
#[cfg(not(debug_assertions))]
const REPS: usize = 10 << 10;

#[cfg(debug_assertions)]
const LEN: usize = 256 << 1;
#[cfg(debug_assertions)]
const REPS: usize = 10 << 1;

#[test]
fn test_consolidator_duration() {
let mut data = Vec::with_capacity(LEN);
let mut data2 = Vec::with_capacity(LEN);
let mut target = Vec::new();
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data2.clear();
target.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_container(&mut data, &mut target);
duration += start.elapsed();

consolidate_updates(&mut data2);
assert_eq!(target, data2);
}
println!("elapsed consolidator {duration:?}");
}

#[test]
fn test_consolidator_duration_vec() {
let mut data = Vec::with_capacity(LEN);
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_updates(&mut data);
duration += start.elapsed();
}
println!("elapsed vec {duration:?}");
}
}
Loading

0 comments on commit d0a1cab

Please sign in to comment.