Skip to content

Commit

Permalink
feat(hydroflow_lang): add all_once() windowing operator (#1585)
Browse files Browse the repository at this point in the history
TODO: ensure input is bounded.
  • Loading branch information
MingweiSamuel committed Nov 27, 2024
1 parent c9e4f94 commit 017e429
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 44 deletions.
6 changes: 0 additions & 6 deletions hydroflow/tests/compile-fail/surface_loop_cycle.stderr
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_cycle.rs:3:9
|
3 | loop {
| ^^^^

error: Operator forms an illegal cycle within a `loop { ... }` block (1/4).
--> tests/compile-fail/surface_loop_cycle.rs:4:31
|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_missing_unwindowing.rs:4:9
|
4 | loop {
| ^^^^

error: Operator `null(...)` exiting a loop context must be an un-windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_unwindowing.rs:7:14
|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_missing_windowing.rs:4:9
|
4 | loop {
| ^^^^

error: Operator `null(...)` entering a loop context must be a windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_windowing.rs:5:18
|
Expand Down
12 changes: 0 additions & 12 deletions hydroflow/tests/compile-fail/surface_loop_multiple_window.stderr
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_multiple_window.rs:4:9
|
4 | loop {
| ^^^^

warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_multiple_window.rs:5:13
|
5 | loop {
| ^^^^

error: Operator input edge may not cross multiple loop contexts.
--> tests/compile-fail/surface_loop_multiple_window.rs:6:22
|
Expand Down
6 changes: 0 additions & 6 deletions hydroflow/tests/compile-fail/surface_loop_source.stderr
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_source.rs:3:9
|
3 | loop {
| ^^^^

error: Source operator `source_iter(...)` must be at the root level, not within any `loop { ... }` contexts.
--> tests/compile-fail/surface_loop_source.rs:4:13
|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter([\"alice\", \"bob\"])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(iter_batches_stream(0..12, 3))", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) all_once()", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|all| println!(\"{}: {:?}\", context.current_tick(), all))", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n11v1 [label="(n11v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n12v1 [label="(n12v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n7v1 [label="0"]
n3v1 -> n4v1
n1v1 -> n10v1
n6v1 -> n7v1 [label="1"]
n5v1 -> n6v1
n2v1 -> n11v1
n8v1 -> n9v1
n7v1 -> n12v1
n10v1 -> n3v1
n11v1 -> n5v1
n12v1 -> n8v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
subgraph "cluster_sg_1v1_var_users" {
label="var users"
n1v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n2v1
subgraph "cluster_sg_2v1_var_messages" {
label="var messages"
n2v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 0"
n3v1
n4v1
n5v1
n6v1
n7v1
subgraph "cluster_sg_3v1_var_cp" {
label="var cp"
n7v1
}
}
subgraph "cluster n4v1" {
fillcolor="#dddddd"
style=filled
label = "sg_4v1\nstratum 1"
n8v1
n9v1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([&quot;alice&quot;, &quot;bob&quot;])</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(iter_batches_stream(0..12, 3))</code>"/]:::pullClass
3v1[\"(3v1) <code>batch()</code>"/]:::pullClass
4v1[\"(4v1) <code>flatten()</code>"/]:::pullClass
5v1[\"(5v1) <code>batch()</code>"/]:::pullClass
6v1[\"(6v1) <code>flatten()</code>"/]:::pullClass
7v1[\"(7v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
8v1[\"(8v1) <code>all_once()</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|all| println!(&quot;{}: {:?}&quot;, context.current_tick(), all))</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
11v1["(11v1) <code>handoff</code>"]:::otherClass
12v1["(12v1) <code>handoff</code>"]:::otherClass
4v1-->|0|7v1
3v1-->4v1
1v1-->10v1
6v1-->|1|7v1
5v1-->6v1
2v1-->11v1
8v1-->9v1
7v1-->12v1
10v1-->3v1
11v1-->5v1
12v1--x8v1; linkStyle 10 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_users ["var <tt>users</tt>"]
1v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
2v1
subgraph sg_2v1_var_messages ["var <tt>messages</tt>"]
2v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
3v1
4v1
5v1
6v1
7v1
subgraph sg_3v1_var_cp ["var <tt>cp</tt>"]
7v1
end
end
subgraph sg_4v1 ["sg_4v1 stratum 1"]
8v1
9v1
end
20 changes: 20 additions & 0 deletions hydroflow/tests/surface_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,23 @@ pub fn test_flo_syntax() {
assert_graphvis_snapshots!(df);
df.run_available();
}


#[multiplatform_test]
pub fn test_flo_nested() {
let mut df = hydroflow_syntax! {
users = source_iter(["alice", "bob"]);
messages = source_stream(iter_batches_stream(0..12, 3));
loop {
// TODO(mingwei): cross_join type negotion should allow us to eliminate `flatten()`.
users -> batch() -> flatten() -> [0]cp;
messages -> batch() -> flatten() -> [1]cp;
cp = cross_join::<'static, 'tick>();
loop {
cp -> all_once() -> for_each(|all| println!("{}: {:?}", context.current_tick(), all));
}
}
};
assert_graphvis_snapshots!(df);
df.run_available();
}
7 changes: 0 additions & 7 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ impl FlatGraphBuilder {
Self::helper_check_unused_port(&mut self.diagnostics, &ends, false);
}
HfStatement::Loop(loop_statement) => {
// TODO(mingwei):
self.diagnostics.push(Diagnostic::spanned(
loop_statement.loop_token.span(),
Level::Warning,
"`loop` blocks are not yet supported.",
));

let inner_loop = self.flat_graph.insert_loop(current_loop);
for stmt in loop_statement.statements {
self.add_statement_with_loop(stmt, Some(inner_loop));
Expand Down
9 changes: 9 additions & 0 deletions hydroflow_lang/src/graph/ops/all_once.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use super::{DelayType, OperatorConstraints};

// Same as batch, but with a stratum barrier.
/// TODO(mingwei): docs
pub const ALL_ONCE: OperatorConstraints = OperatorConstraints {
name: "all_once",
input_delaytype_fn: |_| Some(DelayType::Stratum),
..super::batch::BATCH
};
3 changes: 2 additions & 1 deletion hydroflow_lang/src/graph/ops/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use super::{
RANGE_1,
};

/// TODO(mingwei): docs
pub const BATCH: OperatorConstraints = OperatorConstraints {
name: "batch",
categories: &[OperatorCategory::Fold],
categories: &[OperatorCategory::Fold, OperatorCategory::Windowing],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: &(0..=1),
Expand Down
7 changes: 7 additions & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ macro_rules! declare_ops {
};
}
declare_ops![
all_once::ALL_ONCE,
anti_join::ANTI_JOIN,
anti_join_multiset::ANTI_JOIN_MULTISET,
assert::ASSERT,
Expand Down Expand Up @@ -499,6 +500,8 @@ pub enum OperatorCategory {
Sink,
Control,
CompilerFusionOperator,
Windowing,
Unwindowing,
}
impl OperatorCategory {
/// Human-readible heading name, for docs.
Expand All @@ -517,6 +520,8 @@ impl OperatorCategory {
OperatorCategory::Sink => "Sinks",
OperatorCategory::Control => "Control Flow Operators",
OperatorCategory::CompilerFusionOperator => "Compiler Fusion Operators",
OperatorCategory::Windowing => "Windowing Operator",
OperatorCategory::Unwindowing => "Un-Windowing Operator",
}
}
/// Human description, for docs.
Expand All @@ -541,6 +546,8 @@ impl OperatorCategory {
OperatorCategory::CompilerFusionOperator => {
"Operators which are necessary to implement certain optimizations and rewrite rules"
}
OperatorCategory::Windowing => "Operators for windowing `loop` inputs.",
OperatorCategory::Unwindowing => "Operators for collecting `loop` outputs.",
}
}
}
Expand Down

0 comments on commit 017e429

Please sign in to comment.