-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Functional hashsets #922
Functional hashsets #922
Conversation
At the API level functional hashsets (aka immutable hashsets) behave just like regular hashsets; however their internal implementation supports cloning a hashset in time O(1), by sharing the entire internal state between the clone and the original. Modifying the clone updates only the affected state in a copy-on-write fashion, with the rest of the state still shared with the parent. Example use case (added to `test-stream.sh`): computing the set of all unique id's that appear in a stream. At every iteration, we add all newly observed ids to the set of id's computed so far. This would normally amount to cloning and modifying a potentially large set in time `O(n)`, where `n` is the size of the set. With functional sets, the cost if `O(1)`. Functional data types are generally a great match for working with immutable collections, e.g., collections stored in DDlog relations. We therefore plan to introduce more functional data types in the future, possibly even replacing the standard collections (`Set`, `Map`, `Vec`) with functional versions. Implementation: we implement the library as a DDlog wrapper around the `im` crate. Unfortunately, the crate is no longer maintained and in fact it had some correctness issues described here: bodil/im-rs#175. I forked the crate and fixed the bugs in my fork: ddlog-dev/im-rs@46f13d8. We may need to switch to a different crate in the future, e.g., `rpds`, which is less popular but seems to be better maintained. Performance considerations. While functional sets are faster to copy, they are still expensive to hash and compare (just like normal sets, but potentially even more so due to more complex internal design). My initial implementation of the unique id's use case stored aggregates in a relation. It was about as slow as the implementation using non-functinal sets, with most of the time spent in comparing sets as they were deleted from/insered into relations. The stream-based implementation is >20x faster as it does not compute deltas, and is 8x faster than equivalent implementation using regular sets.
The -1 operator can cause a form of divergent behavior where the program continues generating outputs forever even after the inputs have stopped. Example: ``` stream R(x: usize) R(x) :- S(x). // Initialize R with value from input stream S. R(x) :- R-1(x). // Forever re-inject changes to R for all future // timestamps ``` A program like this will cause DD to never terminate, as it cannot shutdown when there is in-flight data in the pipeline, so the `stop` method does not return. One workaround is to require the programmer to always join delayed relations with another that must be emptied before calling `stop`. But what are the odds people will get this right. Instead, we construct such a relation automatically in the DDlog runtime. Specifically: - We create a `Enabled` relation and populate it with an empty tuple on startup. - On shutdown we remove the tuple from the relation. - We join each delayed relation with `Enabled`, effectively transforming each occurrence of `R-1(x)` into `R-1(x), Enabled()`. (Actually the join is computed exactly once and reused everywhere).
lib/hashset.dl
Outdated
None | ||
} | ||
|
||
/* Returns a vector containing only those elements in `s` that satisfy predicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hashset
* | ||
* Calls the closure on each element of the set. If the closure returns | ||
* `Some{element}`, then that element is returned. */ | ||
function filter_map(s: HashSet<'A>, f: function('A): Option<'B>): HashSet<'B> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these be written using iterators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this is slightly more efficient, as creating a closure in DDlog involves a dynamic memory allocation.
{ | ||
fn from_flatbuf(fb: &'a [T]) -> ::std::result::Result<Self, String> { | ||
let mut set = typedefs::hashset::HashSet::new(); | ||
for x in fb.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn’t there a more efficient way to insert many?
extern function hashset_unions(sets: Vec<HashSet<'X>>): HashSet<'X> | ||
extern function group_hashset_unions(sets: Group<'K, HashSet<'X>>): HashSet<'X> | ||
extern function hashset_intersection(s1: HashSet<'X>, s2: HashSet<'X>): HashSet<'X> | ||
extern function hashset_difference(s1: HashSet<'X>, s2: HashSet<'X>): HashSet<'X> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No singleton?
} | ||
|
||
pub fn hashset_union<T: Hash + Eq + Clone>(s1: &HashSet<T>, s2: &HashSet<T>) -> HashSet<T> { | ||
s1.clone().union(s2.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why clone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The union
function consumes both its arguments, so we cannot pass them by reference.
// with `Enabled`. `Enabled` contains a single record (an empty tuple) as long as | ||
// the program is running. We retract this record on shutdown, hopefully enforcing | ||
// quiescing the dataflow. | ||
let (enabled_session, enabled_collection) = outer.new_collection::<(),Weight>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this overhead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, but a small one, as Enabled
contains exactly one record, and we only pay it if there are delayed relations in the program. For the one benchmark we have for this now, it does not seem to make any difference.
// arg_min. | ||
SetTransforms( | ||
"arg_min(min)", | ||
hashset_singleton(test_set().arg_min(|x| x.1.arg_min(|x|x)).unwrap_or_default())). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the singleton
done | ||
) > unique_bench$1.dat | ||
/usr/bin/time ./streams_ddlog/target/release/streams_cli -w 1 --no-store < unique_bench$1.dat > unique_bench$1.dump | ||
diff -q unique_bench$1.dump unique_bench$1.dump.expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much faster is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version using hashsets is 20x faster than the equivalent implementation using regular sets. The gap growth with the size of the set. I also had an optimized set-based implementation that was only 5 times slower.
lib/hashset.rs
Outdated
if set.remove(&v).is_none() { | ||
set.insert(v); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand the rationale here, why is the element removed and inserted if there wasn't previously a value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically what the above comment says. upd
is a set that describes a set of updates to apply to set
. For each value in upd
, if the value is in set
, then it must be deleted, and if not, then it must be inserted. Actually, now I realize that this is just the symmetric difference of upd
and set
, and we have a function for it, so that's what I should be using instead.
s: &HashSet<T>, | ||
f: &Box<dyn Closure<*const T, B>>, | ||
) -> DDOption<T> { | ||
DDOption::from(s.iter().max_by_key(|x| f.call(*x)).map(|x| (*x).clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the looks of Closure<*const T, _>
Closure::call()
should be an unsafe function
FnvHashMap<RelId, InputSession<TS, DDValue, Weight>>, | ||
BTreeMap< | ||
// Handles to objects involved in managing the progress of the dataflow. | ||
struct SessionData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
hashset_test::SetTransforms{.description = "all(contains(3))", .s = []} | ||
hashset_test::SetTransforms{.description = "any(contains(3))", .s = [(2, [4, 5, 6]), (4, [8, 9, 10]), (3, [6, 7, 8]), (1, [2, 3, 4]), (0, [0, 1, 2])]} | ||
hashset_test::SetTransforms{.description = "arg_max(max)", .s = [(4, [8, 9, 10])]} | ||
hashset_test::SetTransforms{.description = "arg_min(min)", .s = [(0, [0, 1, 2])]} | ||
hashset_test::SetTransforms{.description = "filter(contains 2)", .s = [(1, [2, 3, 4]), (0, [0, 1, 2])]} | ||
hashset_test::SetTransforms{.description = "filter_map(==1, :=100)", .s = [(100, [2, 3, 4])]} | ||
hashset_test::SetTransforms{.description = "find(contains(3))", .s = [(1, [2, 3, 4])]} | ||
hashset_test::SetTransforms{.description = "map(push 100)", .s = [(1, [2, 3, 4, 100]), (3, [6, 7, 8, 100]), (2, [4, 5, 6, 100]), (0, [0, 1, 2, 100]), (4, [8, 9, 10, 100])]} | ||
hashset_test::SetFolds{.description = "fold(+)", .a = 10} | ||
hashset_test::SetFolds{.description = "fold(fold(+))", .a = 75} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder if we shouldn't make Collection::assert_eq()
a ddlog expression (similar to how apply
works)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate? How would this work? This would have to be part of the command language, not DDlog itself, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically I just see it taking advantage of the fact that vectors implement .to_stream()
. You'd make a vec of the contents you expected to see and then we'd call ddflow's assert_eq()
on the collection of expected items and the relation you want to check. Admittedly this only has fairly limited use since the user would have to specify timestamps, but I think it could still be genuinely useful. In a similar vein .assert_empty()
could also be useful
// Create an `Enabled` relation used to enforce the dataflow termination in the | ||
// presence of delayed relations. A delayed relation can potentially generate an | ||
// infinite sequence of outputs for all future timestamps, preventing the | ||
// differential dataflow from terminating. DD can only terminate cleanly when there | ||
// is no data in flight, and will keep spinning forever if new records keep getting | ||
// injected in the dataflow. To prevent this scenario, we join all delayed relations | ||
// with `Enabled`. `Enabled` contains a single record (an empty tuple) as long as | ||
// the program is running. We retract this record on shutdown, hopefully enforcing | ||
// quiescing the dataflow. | ||
let (enabled_session, enabled_collection) = outer.new_collection::<(),Weight>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a note pointing to timely-dataflow/#306 as the (hopefully) ultimate solution to this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, although it doesn't look like it will be implemented any time soon...
Co-authored-by: Chase Wilson <[email protected]>
See individual commit messages.
@Kixiron, any chance you could have a look at the Rust code?