Skip to content

Commit

Permalink
feat(hydroflow_lang): add batch(), checking of flo [un]windowing ops (
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Nov 27, 2024
1 parent 9ace9a9 commit c9e4f94
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 29 deletions.
10 changes: 10 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
b = a -> batch();
}
b -> null();
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_missing_unwindowing.rs:4:9
|
4 | loop {
| ^^^^

error: Operator `null(...)` exiting a loop context must be an un-windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_unwindowing.rs:7:14
|
7 | b -> null();
| ^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
a -> null();
}
};
df.run_available();
}
11 changes: 11 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_missing_windowing.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_missing_windowing.rs:4:9
|
4 | loop {
| ^^^^

error: Operator `null(...)` entering a loop context must be a windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_windowing.rs:5:18
|
5 | a -> null();
| ^^^^^^
11 changes: 11 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_multiple_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
loop {
a -> batch() -> null();
}
}
};
df.run_available();
}
17 changes: 17 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_multiple_window.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_multiple_window.rs:4:9
|
4 | loop {
| ^^^^

warning: `loop` blocks are not yet supported.
--> tests/compile-fail/surface_loop_multiple_window.rs:5:13
|
5 | loop {
| ^^^^

error: Operator input edge may not cross multiple loop contexts.
--> tests/compile-fail/surface_loop_multiple_window.rs:6:22
|
6 | a -> batch() -> null();
| ^^^^^^^
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ digraph {
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter([\"alice\", \"bob\"])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(iter_batches_stream(0..12, 3))", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) for_each(|(user, message)| println!(\"notify {} of {}\", user, message))", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n1v1 -> n5v1
n2v1 -> n6v1
n3v1 [label="(n3v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) for_each(|(user, message)| {\l println!(\"{}: notify {} of {}\", context.current_tick(), user, message)\l})\l", shape=house, fillcolor="#ffff88"]
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n7v1 [label="0"]
n3v1 -> n4v1
n5v1 -> n3v1 [label="0"]
n6v1 -> n3v1 [label="1"]
n1v1 -> n9v1
n6v1 -> n7v1 [label="1"]
n5v1 -> n6v1
n2v1 -> n10v1
n7v1 -> n8v1
n9v1 -> n3v1
n10v1 -> n5v1
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
Expand All @@ -42,10 +50,14 @@ digraph {
label = "sg_3v1\nstratum 0"
n3v1
n4v1
n5v1
n6v1
n7v1
n8v1
subgraph "cluster_sg_3v1_var_cp" {
label="var cp"
n3v1
n4v1
n7v1
n8v1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([&quot;alice&quot;, &quot;bob&quot;])</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(iter_batches_stream(0..12, 3))</code>"/]:::pullClass
3v1[\"(3v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
4v1[/"(4v1) <code>for_each(|(user, message)| println!(&quot;notify {} of {}&quot;, user, message))</code>"\]:::pushClass
5v1["(5v1) <code>handoff</code>"]:::otherClass
6v1["(6v1) <code>handoff</code>"]:::otherClass
1v1-->5v1
2v1-->6v1
3v1[\"(3v1) <code>batch()</code>"/]:::pullClass
4v1[\"(4v1) <code>flatten()</code>"/]:::pullClass
5v1[\"(5v1) <code>batch()</code>"/]:::pullClass
6v1[\"(6v1) <code>flatten()</code>"/]:::pullClass
7v1[\"(7v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
8v1[/"<div style=text-align:center>(8v1)</div> <code>for_each(|(user, message)| {<br> println!(&quot;{}: notify {} of {}&quot;, context.current_tick(), user, message)<br>})</code>"\]:::pushClass
9v1["(9v1) <code>handoff</code>"]:::otherClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
4v1-->|0|7v1
3v1-->4v1
5v1-->|0|3v1
6v1-->|1|3v1
1v1-->9v1
6v1-->|1|7v1
5v1-->6v1
2v1-->10v1
7v1-->8v1
9v1-->3v1
10v1-->5v1
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_users ["var <tt>users</tt>"]
Expand All @@ -34,8 +42,12 @@ end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
3v1
4v1
5v1
6v1
7v1
8v1
subgraph sg_3v1_var_cp ["var <tt>cp</tt>"]
3v1
4v1
7v1
8v1
end
end
7 changes: 4 additions & 3 deletions hydroflow/tests/surface_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ pub fn test_flo_syntax() {
users = source_iter(["alice", "bob"]);
messages = source_stream(iter_batches_stream(0..12, 3));
loop {
users -> [0]cp;
messages -> [1]cp;
cp = cross_join::<'static, 'tick>() -> for_each(|(user, message)| println!("notify {} of {}", user, message));
// TODO(mingwei): cross_join type negotion should allow us to eliminate `flatten()`.
users -> batch() -> flatten() -> [0]cp;
messages -> batch() -> flatten() -> [1]cp;
cp = cross_join::<'static, 'tick>() -> for_each(|(user, message)| println!("{}: notify {} of {}", context.current_tick(), user, message));
}
};
assert_graphvis_snapshots!(df);
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/tests/surface_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ pub fn test_flo_syntax() {
users = source_stream(0..);
messages = source_stream(0..);
loop {
users -> [0]cp;
messages -> [1]cp;
users -> batch() -> flatten() -> [0]cp;
messages -> batch() -> flatten() -> [1]cp;
cp = cross_join() -> for_each(|(user, message)| println!("notify {} of {}", user, message));
}
}
Expand Down
89 changes: 85 additions & 4 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,12 +933,10 @@ impl FlatGraphBuilder {
let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
continue;
};
let Some(_loop_id) = self.flat_graph.node_loop(node_id) else {
continue;
};
let loop_id = self.flat_graph.node_loop(node_id);

// Source operators must be at the top level.
if Some(FloType::Source) == op_inst.op_constraints.flo_type {
if Some(FloType::Source) == op_inst.op_constraints.flo_type && loop_id.is_some() {
self.diagnostics.push(Diagnostic::spanned(
node.span(),
Level::Error,
Expand All @@ -950,6 +948,89 @@ impl FlatGraphBuilder {
}
}

// Check windowing and un-windowing operators, for loop inputs and outputs respectively.
for (_edge_id, (pred_id, node_id)) in self.flat_graph.edges() {
let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
continue;
};
let flo_type = &op_inst.op_constraints.flo_type;

let pred_loop_id = self.flat_graph.node_loop(pred_id);
let loop_id = self.flat_graph.node_loop(node_id);

let span = self.flat_graph.node(node_id).span();

let (is_input, is_output) = {
let parent_pred_loop_id =
pred_loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
let parent_loop_id = loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
let is_same = pred_loop_id == loop_id;
let is_input = !is_same && parent_loop_id == pred_loop_id;
let is_output = !is_same && parent_pred_loop_id == loop_id;
if !(is_input || is_output || is_same) {
self.diagnostics.push(Diagnostic::spanned(
span,
Level::Error,
"Operator input edge may not cross multiple loop contexts.",
));
continue;
}
(is_input, is_output)
};

match flo_type {
None => {
if is_input {
self.diagnostics.push(Diagnostic::spanned(
span,
Level::Error,
format!(
"Operator `{}(...)` entering a loop context must be a windowing operator, but is not.",
op_inst.op_constraints.name
)
));
}
if is_output {
self.diagnostics.push(Diagnostic::spanned(
span,
Level::Error,
format!(
"Operator `{}(...)` exiting a loop context must be an un-windowing operator, but is not.",
op_inst.op_constraints.name
)
));
}
}
Some(FloType::Windowing) => {
if !is_input {
self.diagnostics.push(Diagnostic::spanned(
span,
Level::Error,
format!(
"Windowing operator `{}(...)` must be the first input operator into a `loop {{ ... }} context.",
op_inst.op_constraints.name
)
));
}
}
Some(FloType::Unwindowing) => {
if !is_output {
self.diagnostics.push(Diagnostic::spanned(
span,
Level::Error,
format!(
"Un-windowing operator `{}(...)` must be the first output operator after exiting a `loop {{ ... }} context.",
op_inst.op_constraints.name
)
));
}
}
Some(FloType::Source) => {
// Handled above.
}
}
}

// Must be a DAG (excluding `next_tick()` operators).
// TODO(mingwei): Nested loop blocks should count as a single node.
for (loop_id, loop_nodes) in self.flat_graph.loops() {
Expand Down
82 changes: 82 additions & 0 deletions hydroflow_lang/src/graph/ops/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use quote::quote_spanned;

use super::{
FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
RANGE_1,
};

pub const BATCH: OperatorConstraints = OperatorConstraints {
name: "batch",
categories: &[OperatorCategory::Fold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: &(0..=1),
soft_range_out: &(0..=1),
num_args: 0,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: true,
flo_type: Some(FloType::Windowing),
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
context,
hydroflow,
op_span,
ident,
is_pull,
inputs,
outputs,
singleton_output_ident,
..
},
_diagnostics| {
let write_prologue = quote_spanned! {op_span=>
#[allow(clippy::redundant_closure_call)]
let #singleton_output_ident = #hydroflow.add_state(
::std::cell::RefCell::new(::std::vec::Vec::new())
);

// TODO(mingwei): Is this needed?
// Reset the value to the initializer fn if it is a new tick.
#hydroflow.set_state_tick_hook(#singleton_output_ident, move |rcell| { rcell.take(); });
};

let vec_ident = wc.make_ident("vec");

let write_iterator = if is_pull {
// Pull.
let input = &inputs[0];
quote_spanned! {op_span=>
let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
*#vec_ident = #input.collect::<::std::vec::Vec<_>>();
let #ident = ::std::iter::once(::std::clone::Clone::clone(&*#vec_ident));
}
} else if let Some(output) = outputs.first() {
// Push with output.
quote_spanned! {op_span=>
let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
let #ident = #root::pusherator::inspect::Inspect::new(|item| {
::std::vec::Vec::push(#vec_ident, ::std::clone::Clone::clone(item));
}, #output);
}
} else {
// Push with no output.
quote_spanned! {op_span=>
let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
let #ident = #root::pusherator::for_each::ForEach::new(|item| {
::std::vec::Vec::push(#vec_ident, item);
});
}
};

Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
},
};
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ declare_ops![
anti_join_multiset::ANTI_JOIN_MULTISET,
assert::ASSERT,
assert_eq::ASSERT_EQ,
batch::BATCH,
chain::CHAIN,
cross_join::CROSS_JOIN,
cross_join_multiset::CROSS_JOIN_MULTISET,
Expand Down

0 comments on commit c9e4f94

Please sign in to comment.