Skip to content

Commit

Permalink
feat(hydroflow_lang): add (inefficient) lattice_bimorphism operator,
Browse files Browse the repository at this point in the history
…fix #1073 (#1061)
  • Loading branch information
MingweiSamuel committed Mar 1, 2024
1 parent ff158db commit 09f0c57
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 200 deletions.
4 changes: 2 additions & 2 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Context {
}

/// Returns a shared reference to the state.
pub fn state_ref<T>(&self, handle: StateHandle<T>) -> &T
pub fn state_ref<T>(&self, handle: StateHandle<T>) -> &'_ T
where
T: Any,
{
Expand All @@ -116,7 +116,7 @@ impl Context {
}

/// Returns an exclusive reference to the state.
pub fn state_mut<T>(&mut self, handle: StateHandle<T>) -> &mut T
pub fn state_mut<T>(&mut self, handle: StateHandle<T>) -> &'_ mut T
where
T: Any,
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
source: hydroflow/tests/surface_lattice_bimorphism.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter_delta(0..3)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) map(SetUnionSingletonSet::new_from)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) state::<SetUnionHashSet<usize>>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) source_iter_delta(3..5)", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(SetUnionSingletonSet::new_from)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) state::<SetUnionHashSet<usize>>()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) lattice_bimorphism(CartesianProductBimorphism::<HashSet<_>>::default())", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) lattice_reduce()", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|x| out_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1 [color=darkgreen, style=dashed]
n1v1 -> n2v1 [color=darkgreen, style=dashed]
n5v1 -> n6v1 [color=darkgreen, style=dashed]
n4v1 -> n5v1 [color=darkgreen, style=dashed]
n3v1 -> n7v1 [label="items\nitems_0", color=darkgreen, style=bold]
n6v1 -> n7v1 [label="items\nitems_1", color=darkgreen, style=bold]
n3v1 -> n7v1 [label="state\nstate_0", color=darkgreen, style=bold]
n6v1 -> n7v1 [label="state\nstate_1", color=darkgreen, style=bold]
n8v1 -> n9v1 [color=darkgreen, style=bold]
n7v1 -> n10v1 [color=darkgreen, style=bold]
n10v1 -> n8v1 [color=red, style=bold]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
n2v1
n3v1
n4v1
n5v1
n6v1
n7v1
subgraph "cluster_sg_1v1_var_lhs" {
label="var lhs"
n1v1
n2v1
n3v1
}
subgraph "cluster_sg_1v1_var_my_join" {
label="var my_join"
n7v1
}
subgraph "cluster_sg_1v1_var_rhs" {
label="var rhs"
n4v1
n5v1
n6v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n8v1
n9v1
subgraph "cluster_sg_2v1_var_my_join" {
label="var my_join"
n8v1
n9v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
source: hydroflow/tests/surface_lattice_bimorphism.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter_delta(0..3)</code>"/]:::pullClass
2v1[\"(2v1) <code>map(SetUnionSingletonSet::new_from)</code>"/]:::pullClass
3v1[\"(3v1) <code>state::&lt;SetUnionHashSet&lt;usize&gt;&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>source_iter_delta(3..5)</code>"/]:::pullClass
5v1[\"(5v1) <code>map(SetUnionSingletonSet::new_from)</code>"/]:::pullClass
6v1[\"(6v1) <code>state::&lt;SetUnionHashSet&lt;usize&gt;&gt;()</code>"/]:::pullClass
7v1[\"(7v1) <code>lattice_bimorphism(CartesianProductBimorphism::&lt;HashSet&lt;_&gt;&gt;::default())</code>"/]:::pullClass
8v1[\"(8v1) <code>lattice_reduce()</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|x| out_send.send(x).unwrap())</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-.->3v1; linkStyle 0 stroke:#060
1v1-.->2v1; linkStyle 1 stroke:#060
5v1-.->6v1; linkStyle 2 stroke:#060
4v1-.->5v1; linkStyle 3 stroke:#060
3v1==>|items<br>items_0|7v1; linkStyle 4 stroke:#060
6v1==>|items<br>items_1|7v1; linkStyle 5 stroke:#060
3v1==>|state<br>state_0|7v1; linkStyle 6 stroke:#060
6v1==>|state<br>state_1|7v1; linkStyle 7 stroke:#060
8v1==>9v1; linkStyle 8 stroke:#060
7v1==>10v1; linkStyle 9 stroke:#060
10v1==>8v1; linkStyle 10 stroke:#060
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
2v1
3v1
4v1
5v1
6v1
7v1
subgraph sg_1v1_var_lhs ["var <tt>lhs</tt>"]
1v1
2v1
3v1
end
subgraph sg_1v1_var_my_join ["var <tt>my_join</tt>"]
7v1
end
subgraph sg_1v1_var_rhs ["var <tt>rhs</tt>"]
4v1
5v1
6v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
8v1
9v1
subgraph sg_2v1_var_my_join ["var <tt>my_join</tt>"]
8v1
9v1
end
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
source: hydroflow/tests/surface_lattice_bimorphism.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter_delta([(7, 1), (7, 2)])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) state::<MapUnionHashMap<usize, SetUnionHashSet<usize>>>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) source_iter_delta([(7, 0), (7, 1), (7, 2)])", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) state::<MapUnionHashMap<usize, SetUnionHashSet<usize>>>()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) lattice_bimorphism(\l KeyedBimorphism::<\l HashMap<_, _>,\l _,\l >::from(CartesianProductBimorphism::<HashSet<_>>::default()),\l)\l", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) lattice_reduce()", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|x| out_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1 [color=darkgreen, style=dashed]
n1v1 -> n2v1 [color=darkgreen, style=dashed]
n5v1 -> n6v1 [color=darkgreen, style=dashed]
n4v1 -> n5v1 [color=darkgreen, style=dashed]
n3v1 -> n7v1 [label="items\nitems_0", color=darkgreen, style=bold]
n6v1 -> n7v1 [label="items\nitems_1", color=darkgreen, style=bold]
n3v1 -> n7v1 [label="state\nstate_0", color=darkgreen, style=bold]
n6v1 -> n7v1 [label="state\nstate_1", color=darkgreen, style=bold]
n8v1 -> n9v1 [color=darkgreen, style=bold]
n7v1 -> n10v1 [color=darkgreen, style=bold]
n10v1 -> n8v1 [color=red, style=bold]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
n2v1
n3v1
n4v1
n5v1
n6v1
n7v1
subgraph "cluster_sg_1v1_var_lhs" {
label="var lhs"
n1v1
n2v1
n3v1
}
subgraph "cluster_sg_1v1_var_my_join" {
label="var my_join"
n7v1
}
subgraph "cluster_sg_1v1_var_rhs" {
label="var rhs"
n4v1
n5v1
n6v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n8v1
n9v1
subgraph "cluster_sg_2v1_var_my_join" {
label="var my_join"
n8v1
n9v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
source: hydroflow/tests/surface_lattice_bimorphism.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter_delta([(7, 1), (7, 2)])</code>"/]:::pullClass
2v1[\"(2v1) <code>map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))</code>"/]:::pullClass
3v1[\"(3v1) <code>state::&lt;MapUnionHashMap&lt;usize, SetUnionHashSet&lt;usize&gt;&gt;&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>source_iter_delta([(7, 0), (7, 1), (7, 2)])</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))</code>"/]:::pullClass
6v1[\"(6v1) <code>state::&lt;MapUnionHashMap&lt;usize, SetUnionHashSet&lt;usize&gt;&gt;&gt;()</code>"/]:::pullClass
7v1[\"<div style=text-align:center>(7v1)</div> <code>lattice_bimorphism(<br> KeyedBimorphism::&lt;<br> HashMap&lt;_, _&gt;,<br> _,<br> &gt;::from(CartesianProductBimorphism::&lt;HashSet&lt;_&gt;&gt;::default()),<br>)</code>"/]:::pullClass
8v1[\"(8v1) <code>lattice_reduce()</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|x| out_send.send(x).unwrap())</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-.->3v1; linkStyle 0 stroke:#060
1v1-.->2v1; linkStyle 1 stroke:#060
5v1-.->6v1; linkStyle 2 stroke:#060
4v1-.->5v1; linkStyle 3 stroke:#060
3v1==>|items<br>items_0|7v1; linkStyle 4 stroke:#060
6v1==>|items<br>items_1|7v1; linkStyle 5 stroke:#060
3v1==>|state<br>state_0|7v1; linkStyle 6 stroke:#060
6v1==>|state<br>state_1|7v1; linkStyle 7 stroke:#060
8v1==>9v1; linkStyle 8 stroke:#060
7v1==>10v1; linkStyle 9 stroke:#060
10v1==>8v1; linkStyle 10 stroke:#060
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
2v1
3v1
4v1
5v1
6v1
7v1
subgraph sg_1v1_var_lhs ["var <tt>lhs</tt>"]
1v1
2v1
3v1
end
subgraph sg_1v1_var_my_join ["var <tt>my_join</tt>"]
7v1
end
subgraph sg_1v1_var_rhs ["var <tt>rhs</tt>"]
4v1
5v1
6v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
8v1
9v1
subgraph sg_2v1_var_my_join ["var <tt>my_join</tt>"]
8v1
9v1
end
end

86 changes: 86 additions & 0 deletions hydroflow/tests/surface_lattice_bimorphism.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::collections::{HashMap, HashSet};

use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_cartesian_product() {
let (out_send, out_recv) = hydroflow::util::unbounded_channel::<_>();

let mut df = hydroflow_syntax! {
lhs = source_iter_delta(0..3)
-> map(SetUnionSingletonSet::new_from)
-> state::<SetUnionHashSet<usize>>();
rhs = source_iter_delta(3..5)
-> map(SetUnionSingletonSet::new_from)
-> state::<SetUnionHashSet<usize>>();

lhs[items] -> [items_0]my_join;
rhs[items] -> [items_1]my_join;
lhs[state] -> [state_0]my_join;
rhs[state] -> [state_1]my_join;

my_join = lattice_bimorphism(CartesianProductBimorphism::<HashSet<_>>::default())
-> lattice_reduce()
-> for_each(|x| out_send.send(x).unwrap());
};

assert_graphvis_snapshots!(df);
df.run_available();

assert_eq!(
&[SetUnionHashSet::new(HashSet::from_iter([
(0, 3),
(0, 4),
(1, 3),
(1, 4),
(2, 3),
(2, 4),
]))],
&*collect_ready::<Vec<_>, _>(out_recv)
);
}

#[multiplatform_test]
pub fn test_join() {
let (out_send, out_recv) = hydroflow::util::unbounded_channel::<_>();

let mut df = hydroflow_syntax! {
lhs = source_iter_delta([(7, 1), (7, 2)])
-> map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))
-> state::<MapUnionHashMap<usize, SetUnionHashSet<usize>>>();
rhs = source_iter_delta([(7, 0), (7, 1), (7, 2)])
-> map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))
-> state::<MapUnionHashMap<usize, SetUnionHashSet<usize>>>();

lhs[items] -> [items_0]my_join;
rhs[items] -> [items_1]my_join;
lhs[state] -> [state_0]my_join;
rhs[state] -> [state_1]my_join;

my_join = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::from(CartesianProductBimorphism::<HashSet<_>>::default()))
-> lattice_reduce()

Check warning on line 65 in hydroflow/tests/surface_lattice_bimorphism.rs

View workflow job for this annotation

GitHub Actions / Test Suite (WebAssembly) (latest-nightly)

`lattice_reduce` input is already cumulative lattice flow, this operator is redundant.
-> for_each(|x| out_send.send(x).unwrap());
};

assert_graphvis_snapshots!(df);
df.run_available();

assert_eq!(
&[MapUnionHashMap::new(HashMap::from_iter([(
7,
SetUnionHashSet::new(HashSet::from_iter([
(1, 0),
(1, 1),
(1, 2),
(2, 0),
(2, 1),
(2, 2),
]))
)]))],
&*collect_ready::<Vec<_>, _>(out_recv)
);
}
Loading

0 comments on commit 09f0c57

Please sign in to comment.