Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functional hashsets #922

Merged
merged 4 commits into from
Feb 28, 2021
Merged

Functional hashsets #922

merged 4 commits into from
Feb 28, 2021

Conversation

ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Feb 27, 2021

See individual commit messages.

@Kixiron, any chance you could have a look at the Rust code?

At the API level functional hashsets (aka immutable hashsets) behave
just like regular hashsets; however their internal implementation
supports cloning a hashset in time O(1), by sharing the entire internal
state between the clone and the original.  Modifying the clone updates
only the affected state in a copy-on-write fashion, with the rest of the
state still shared with the parent.

Example use case (added to `test-stream.sh`): computing the set of all
unique id's that appear in a stream.  At every iteration, we add all
newly observed ids to the set of id's computed so far.  This would
normally amount to cloning and modifying a potentially large set in time
`O(n)`, where `n` is the size of the set.  With functional sets, the
cost if `O(1)`.

Functional data types are generally a great match for working with immutable
collections, e.g., collections stored in DDlog relations.  We therefore plan
to introduce more functional data types in the future, possibly even
replacing the standard collections (`Set`, `Map`, `Vec`) with functional
versions.

Implementation: we implement the library as a DDlog wrapper around the
`im` crate.  Unfortunately, the crate is no longer maintained and in
fact it had some correctness issues described here:
bodil/im-rs#175.  I forked the crate and fixed
the bugs in my fork:
ddlog-dev/im-rs@46f13d8.

We may need to switch to a different crate in the future, e.g., `rpds`,
which is less popular but seems to be better maintained.

Performance considerations.  While functional sets are faster to copy,
they are still expensive to hash and compare (just like normal sets, but
potentially even more so due to more complex internal design).  My
initial implementation of the unique id's use case stored aggregates in
a relation.  It was about as slow as the implementation using
non-functinal sets, with most of the time spent in comparing sets as
they were deleted from/insered into relations.  The stream-based
implementation is >20x faster as it does not compute deltas, and is 8x
faster than equivalent implementation using regular sets.
The -1 operator can cause a form of divergent behavior where the program
continues generating outputs forever even after the inputs have stopped.

Example:

```
stream R(x: usize)

R(x) :- S(x).   // Initialize R with value from input stream S.
R(x) :- R-1(x). // Forever re-inject changes to R for all future
                // timestamps
```

A program like this will cause DD to never terminate, as it cannot
shutdown when there is in-flight data in the pipeline, so the `stop`
method does not return.

One workaround is to require the programmer to always join delayed
relations with another that must be emptied before calling `stop`.  But
what are the odds people will get this right.

Instead, we construct such a relation automatically in the DDlog runtime.
Specifically:

- We create a `Enabled` relation and populate it with an empty tuple on
  startup.
- On shutdown we remove the tuple from the relation.
- We join each delayed relation with `Enabled`, effectively transforming
  each occurrence of `R-1(x)` into `R-1(x), Enabled()`. (Actually the
  join is computed exactly once and reused everywhere).
@ryzhyk ryzhyk requested a review from mihaibudiu February 27, 2021 07:08
lib/hashset.dl Outdated
None
}

/* Returns a vector containing only those elements in `s` that satisfy predicate

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hashset

*
* Calls the closure on each element of the set. If the closure returns
* `Some{element}`, then that element is returned. */
function filter_map(s: HashSet<'A>, f: function('A): Option<'B>): HashSet<'B> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these be written using iterators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is slightly more efficient, as creating a closure in DDlog involves a dynamic memory allocation.

{
fn from_flatbuf(fb: &'a [T]) -> ::std::result::Result<Self, String> {
let mut set = typedefs::hashset::HashSet::new();
for x in fb.iter() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn’t there a more efficient way to insert many?

extern function hashset_unions(sets: Vec<HashSet<'X>>): HashSet<'X>
extern function group_hashset_unions(sets: Group<'K, HashSet<'X>>): HashSet<'X>
extern function hashset_intersection(s1: HashSet<'X>, s2: HashSet<'X>): HashSet<'X>
extern function hashset_difference(s1: HashSet<'X>, s2: HashSet<'X>): HashSet<'X>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No singleton?

}

pub fn hashset_union<T: Hash + Eq + Clone>(s1: &HashSet<T>, s2: &HashSet<T>) -> HashSet<T> {
s1.clone().union(s2.clone())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The union function consumes both its arguments, so we cannot pass them by reference.

// with `Enabled`. `Enabled` contains a single record (an empty tuple) as long as
// the program is running. We retract this record on shutdown, hopefully enforcing
// quiescing the dataflow.
let (enabled_session, enabled_collection) = outer.new_collection::<(),Weight>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this overhead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, but a small one, as Enabled contains exactly one record, and we only pay it if there are delayed relations in the program. For the one benchmark we have for this now, it does not seem to make any difference.

// arg_min.
SetTransforms(
"arg_min(min)",
hashset_singleton(test_set().arg_min(|x| x.1.arg_min(|x|x)).unwrap_or_default())).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the singleton

done
) > unique_bench$1.dat
/usr/bin/time ./streams_ddlog/target/release/streams_cli -w 1 --no-store < unique_bench$1.dat > unique_bench$1.dump
diff -q unique_bench$1.dump unique_bench$1.dump.expected

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much faster is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version using hashsets is 20x faster than the equivalent implementation using regular sets. The gap growth with the size of the set. I also had an optimized set-based implementation that was only 5 times slower.

rust/template/cmd_parser/lib.rs Outdated Show resolved Hide resolved
rust/template/differential_datalog/src/program/worker.rs Outdated Show resolved Hide resolved
lib/hashset.rs Outdated Show resolved Hide resolved
lib/hashset.rs Outdated Show resolved Hide resolved
lib/hashset.rs Outdated Show resolved Hide resolved
lib/hashset.rs Outdated
Comment on lines 293 to 294
if set.remove(&v).is_none() {
set.insert(v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the rationale here, why is the element removed and inserted if there wasn't previously a value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically what the above comment says. upd is a set that describes a set of updates to apply to set. For each value in upd, if the value is in set, then it must be deleted, and if not, then it must be inserted. Actually, now I realize that this is just the symmetric difference of upd and set, and we have a function for it, so that's what I should be using instead.

s: &HashSet<T>,
f: &Box<dyn Closure<*const T, B>>,
) -> DDOption<T> {
DDOption::from(s.iter().max_by_key(|x| f.call(*x)).map(|x| (*x).clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the looks of Closure<*const T, _> Closure::call() should be an unsafe function

FnvHashMap<RelId, InputSession<TS, DDValue, Weight>>,
BTreeMap<
// Handles to objects involved in managing the progress of the dataflow.
struct SessionData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Comment on lines +1 to +10
hashset_test::SetTransforms{.description = "all(contains(3))", .s = []}
hashset_test::SetTransforms{.description = "any(contains(3))", .s = [(2, [4, 5, 6]), (4, [8, 9, 10]), (3, [6, 7, 8]), (1, [2, 3, 4]), (0, [0, 1, 2])]}
hashset_test::SetTransforms{.description = "arg_max(max)", .s = [(4, [8, 9, 10])]}
hashset_test::SetTransforms{.description = "arg_min(min)", .s = [(0, [0, 1, 2])]}
hashset_test::SetTransforms{.description = "filter(contains 2)", .s = [(1, [2, 3, 4]), (0, [0, 1, 2])]}
hashset_test::SetTransforms{.description = "filter_map(==1, :=100)", .s = [(100, [2, 3, 4])]}
hashset_test::SetTransforms{.description = "find(contains(3))", .s = [(1, [2, 3, 4])]}
hashset_test::SetTransforms{.description = "map(push 100)", .s = [(1, [2, 3, 4, 100]), (3, [6, 7, 8, 100]), (2, [4, 5, 6, 100]), (0, [0, 1, 2, 100]), (4, [8, 9, 10, 100])]}
hashset_test::SetFolds{.description = "fold(+)", .a = 10}
hashset_test::SetFolds{.description = "fold(fold(+))", .a = 75}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me wonder if we shouldn't make Collection::assert_eq() a ddlog expression (similar to how apply works)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? How would this work? This would have to be part of the command language, not DDlog itself, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I just see it taking advantage of the fact that vectors implement .to_stream(). You'd make a vec of the contents you expected to see and then we'd call ddflow's assert_eq() on the collection of expected items and the relation you want to check. Admittedly this only has fairly limited use since the user would have to specify timestamps, but I think it could still be genuinely useful. In a similar vein .assert_empty() could also be useful

Comment on lines 445 to 454
// Create an `Enabled` relation used to enforce the dataflow termination in the
// presence of delayed relations. A delayed relation can potentially generate an
// infinite sequence of outputs for all future timestamps, preventing the
// differential dataflow from terminating. DD can only terminate cleanly when there
// is no data in flight, and will keep spinning forever if new records keep getting
// injected in the dataflow. To prevent this scenario, we join all delayed relations
// with `Enabled`. `Enabled` contains a single record (an empty tuple) as long as
// the program is running. We retract this record on shutdown, hopefully enforcing
// quiescing the dataflow.
let (enabled_session, enabled_collection) = outer.new_collection::<(),Weight>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a note pointing to timely-dataflow/#306 as the (hopefully) ultimate solution to this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, although it doesn't look like it will be implemented any time soon...

@ryzhyk ryzhyk merged commit e06d676 into vmware:master Feb 28, 2021
@ryzhyk ryzhyk deleted the im branch February 28, 2021 18:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants