diff --git a/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.rs b/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.rs
new file mode 100644
index 00000000000..808bfae7a0e
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.rs
@@ -0,0 +1,10 @@
+fn main() {
+ let mut df = hydroflow::hydroflow_syntax! {
+ a = source_iter(0..10);
+ loop {
+ b = a -> batch();
+ }
+ b -> null();
+ };
+ df.run_available();
+}
diff --git a/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.stderr b/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.stderr
new file mode 100644
index 00000000000..9d8f90d407a
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.stderr
@@ -0,0 +1,11 @@
+warning: `loop` blocks are not yet supported.
+ --> tests/compile-fail/surface_loop_missing_unwindowing.rs:4:9
+ |
+4 | loop {
+ | ^^^^
+
+error: Operator `null(...)` exiting a loop context must be an un-windowing operator, but is not.
+ --> tests/compile-fail/surface_loop_missing_unwindowing.rs:7:14
+ |
+7 | b -> null();
+ | ^^^^^^
diff --git a/hydroflow/tests/compile-fail/surface_loop_missing_windowing.rs b/hydroflow/tests/compile-fail/surface_loop_missing_windowing.rs
new file mode 100644
index 00000000000..2177d2ad0e8
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_missing_windowing.rs
@@ -0,0 +1,9 @@
+fn main() {
+ let mut df = hydroflow::hydroflow_syntax! {
+ a = source_iter(0..10);
+ loop {
+ a -> null();
+ }
+ };
+ df.run_available();
+}
diff --git a/hydroflow/tests/compile-fail/surface_loop_missing_windowing.stderr b/hydroflow/tests/compile-fail/surface_loop_missing_windowing.stderr
new file mode 100644
index 00000000000..072ae87101c
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_missing_windowing.stderr
@@ -0,0 +1,11 @@
+warning: `loop` blocks are not yet supported.
+ --> tests/compile-fail/surface_loop_missing_windowing.rs:4:9
+ |
+4 | loop {
+ | ^^^^
+
+error: Operator `null(...)` entering a loop context must be a windowing operator, but is not.
+ --> tests/compile-fail/surface_loop_missing_windowing.rs:5:18
+ |
+5 | a -> null();
+ | ^^^^^^
diff --git a/hydroflow/tests/compile-fail/surface_loop_multiple_window.rs b/hydroflow/tests/compile-fail/surface_loop_multiple_window.rs
new file mode 100644
index 00000000000..eb0eb775bf2
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_multiple_window.rs
@@ -0,0 +1,11 @@
+fn main() {
+ let mut df = hydroflow::hydroflow_syntax! {
+ a = source_iter(0..10);
+ loop {
+ loop {
+ a -> batch() -> null();
+ }
+ }
+ };
+ df.run_available();
+}
diff --git a/hydroflow/tests/compile-fail/surface_loop_multiple_window.stderr b/hydroflow/tests/compile-fail/surface_loop_multiple_window.stderr
new file mode 100644
index 00000000000..ad8ef61718d
--- /dev/null
+++ b/hydroflow/tests/compile-fail/surface_loop_multiple_window.stderr
@@ -0,0 +1,17 @@
+warning: `loop` blocks are not yet supported.
+ --> tests/compile-fail/surface_loop_multiple_window.rs:4:9
+ |
+4 | loop {
+ | ^^^^
+
+warning: `loop` blocks are not yet supported.
+ --> tests/compile-fail/surface_loop_multiple_window.rs:5:13
+ |
+5 | loop {
+ | ^^^^
+
+error: Operator input edge may not cross multiple loop contexts.
+ --> tests/compile-fail/surface_loop_multiple_window.rs:6:22
+ |
+6 | a -> batch() -> null();
+ | ^^^^^^^
diff --git a/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_dot.snap
index c5010b0f2f9..57f118a3fee 100644
--- a/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_dot.snap
+++ b/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_dot.snap
@@ -7,15 +7,23 @@ digraph {
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter([\"alice\", \"bob\"])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(iter_batches_stream(0..12, 3))", shape=invhouse, fillcolor="#88aaff"]
- n3v1 [label="(n3v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
- n4v1 [label="(n4v1) for_each(|(user, message)| println!(\"notify {} of {}\", user, message))", shape=house, fillcolor="#ffff88"]
- n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
- n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
- n1v1 -> n5v1
- n2v1 -> n6v1
+ n3v1 [label="(n3v1) batch()", shape=invhouse, fillcolor="#88aaff"]
+ n4v1 [label="(n4v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
+ n5v1 [label="(n5v1) batch()", shape=invhouse, fillcolor="#88aaff"]
+ n6v1 [label="(n6v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
+ n7v1 [label="(n7v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
+ n8v1 [label="(n8v1) for_each(|(user, message)| {\l println!(\"{}: notify {} of {}\", context.current_tick(), user, message)\l})\l", shape=house, fillcolor="#ffff88"]
+ n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
+ n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
+ n4v1 -> n7v1 [label="0"]
n3v1 -> n4v1
- n5v1 -> n3v1 [label="0"]
- n6v1 -> n3v1 [label="1"]
+ n1v1 -> n9v1
+ n6v1 -> n7v1 [label="1"]
+ n5v1 -> n6v1
+ n2v1 -> n10v1
+ n7v1 -> n8v1
+ n9v1 -> n3v1
+ n10v1 -> n5v1
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
@@ -42,10 +50,14 @@ digraph {
label = "sg_3v1\nstratum 0"
n3v1
n4v1
+ n5v1
+ n6v1
+ n7v1
+ n8v1
subgraph "cluster_sg_3v1_var_cp" {
label="var cp"
- n3v1
- n4v1
+ n7v1
+ n8v1
}
}
}
diff --git a/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_mermaid.snap
index 6d129f8db2d..3fdd6bd9459 100644
--- a/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_mermaid.snap
+++ b/hydroflow/tests/snapshots/surface_loop__flo_syntax@graphvis_mermaid.snap
@@ -10,15 +10,23 @@ classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) source_iter(["alice", "bob"])
"/]:::pullClass
2v1[\"(2v1) source_stream(iter_batches_stream(0..12, 3))
"/]:::pullClass
-3v1[\"(3v1) cross_join::<'static, 'tick>()
"/]:::pullClass
-4v1[/"(4v1) for_each(|(user, message)| println!("notify {} of {}", user, message))
"\]:::pushClass
-5v1["(5v1) handoff
"]:::otherClass
-6v1["(6v1) handoff
"]:::otherClass
-1v1-->5v1
-2v1-->6v1
+3v1[\"(3v1) batch()
"/]:::pullClass
+4v1[\"(4v1) flatten()
"/]:::pullClass
+5v1[\"(5v1) batch()
"/]:::pullClass
+6v1[\"(6v1) flatten()
"/]:::pullClass
+7v1[\"(7v1) cross_join::<'static, 'tick>()
"/]:::pullClass
+8v1[/"
for_each(|(user, message)| {
println!("{}: notify {} of {}", context.current_tick(), user, message)
})
"\]:::pushClass
+9v1["(9v1) handoff
"]:::otherClass
+10v1["(10v1) handoff
"]:::otherClass
+4v1-->|0|7v1
3v1-->4v1
-5v1-->|0|3v1
-6v1-->|1|3v1
+1v1-->9v1
+6v1-->|1|7v1
+5v1-->6v1
+2v1-->10v1
+7v1-->8v1
+9v1-->3v1
+10v1-->5v1
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_users ["var users"]
@@ -34,8 +42,12 @@ end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
3v1
4v1
+ 5v1
+ 6v1
+ 7v1
+ 8v1
subgraph sg_3v1_var_cp ["var cp"]
- 3v1
- 4v1
+ 7v1
+ 8v1
end
end
diff --git a/hydroflow/tests/surface_loop.rs b/hydroflow/tests/surface_loop.rs
index 0b3bdd84f10..f5588f1a4ba 100644
--- a/hydroflow/tests/surface_loop.rs
+++ b/hydroflow/tests/surface_loop.rs
@@ -8,9 +8,10 @@ pub fn test_flo_syntax() {
users = source_iter(["alice", "bob"]);
messages = source_stream(iter_batches_stream(0..12, 3));
loop {
- users -> [0]cp;
- messages -> [1]cp;
- cp = cross_join::<'static, 'tick>() -> for_each(|(user, message)| println!("notify {} of {}", user, message));
+ // TODO(mingwei): cross_join type negotion should allow us to eliminate `flatten()`.
+ users -> batch() -> flatten() -> [0]cp;
+ messages -> batch() -> flatten() -> [1]cp;
+ cp = cross_join::<'static, 'tick>() -> for_each(|(user, message)| println!("{}: notify {} of {}", context.current_tick(), user, message));
}
};
assert_graphvis_snapshots!(df);
diff --git a/hydroflow/tests/surface_parser.rs b/hydroflow/tests/surface_parser.rs
index 9c401525faa..ad7a137dfd5 100644
--- a/hydroflow/tests/surface_parser.rs
+++ b/hydroflow/tests/surface_parser.rs
@@ -243,8 +243,8 @@ pub fn test_flo_syntax() {
users = source_stream(0..);
messages = source_stream(0..);
loop {
- users -> [0]cp;
- messages -> [1]cp;
+ users -> batch() -> flatten() -> [0]cp;
+ messages -> batch() -> flatten() -> [1]cp;
cp = cross_join() -> for_each(|(user, message)| println!("notify {} of {}", user, message));
}
}
diff --git a/hydroflow_lang/src/graph/flat_graph_builder.rs b/hydroflow_lang/src/graph/flat_graph_builder.rs
index 0c29425ada7..8a157a29cd0 100644
--- a/hydroflow_lang/src/graph/flat_graph_builder.rs
+++ b/hydroflow_lang/src/graph/flat_graph_builder.rs
@@ -933,12 +933,10 @@ impl FlatGraphBuilder {
let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
continue;
};
- let Some(_loop_id) = self.flat_graph.node_loop(node_id) else {
- continue;
- };
+ let loop_id = self.flat_graph.node_loop(node_id);
// Source operators must be at the top level.
- if Some(FloType::Source) == op_inst.op_constraints.flo_type {
+ if Some(FloType::Source) == op_inst.op_constraints.flo_type && loop_id.is_some() {
self.diagnostics.push(Diagnostic::spanned(
node.span(),
Level::Error,
@@ -950,6 +948,89 @@ impl FlatGraphBuilder {
}
}
+ // Check windowing and un-windowing operators, for loop inputs and outputs respectively.
+ for (_edge_id, (pred_id, node_id)) in self.flat_graph.edges() {
+ let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
+ continue;
+ };
+ let flo_type = &op_inst.op_constraints.flo_type;
+
+ let pred_loop_id = self.flat_graph.node_loop(pred_id);
+ let loop_id = self.flat_graph.node_loop(node_id);
+
+ let span = self.flat_graph.node(node_id).span();
+
+ let (is_input, is_output) = {
+ let parent_pred_loop_id =
+ pred_loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
+ let parent_loop_id = loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
+ let is_same = pred_loop_id == loop_id;
+ let is_input = !is_same && parent_loop_id == pred_loop_id;
+ let is_output = !is_same && parent_pred_loop_id == loop_id;
+ if !(is_input || is_output || is_same) {
+ self.diagnostics.push(Diagnostic::spanned(
+ span,
+ Level::Error,
+ "Operator input edge may not cross multiple loop contexts.",
+ ));
+ continue;
+ }
+ (is_input, is_output)
+ };
+
+ match flo_type {
+ None => {
+ if is_input {
+ self.diagnostics.push(Diagnostic::spanned(
+ span,
+ Level::Error,
+ format!(
+ "Operator `{}(...)` entering a loop context must be a windowing operator, but is not.",
+ op_inst.op_constraints.name
+ )
+ ));
+ }
+ if is_output {
+ self.diagnostics.push(Diagnostic::spanned(
+ span,
+ Level::Error,
+ format!(
+ "Operator `{}(...)` exiting a loop context must be an un-windowing operator, but is not.",
+ op_inst.op_constraints.name
+ )
+ ));
+ }
+ }
+ Some(FloType::Windowing) => {
+ if !is_input {
+ self.diagnostics.push(Diagnostic::spanned(
+ span,
+ Level::Error,
+ format!(
+ "Windowing operator `{}(...)` must be the first input operator into a `loop {{ ... }} context.",
+ op_inst.op_constraints.name
+ )
+ ));
+ }
+ }
+ Some(FloType::Unwindowing) => {
+ if !is_output {
+ self.diagnostics.push(Diagnostic::spanned(
+ span,
+ Level::Error,
+ format!(
+ "Un-windowing operator `{}(...)` must be the first output operator after exiting a `loop {{ ... }} context.",
+ op_inst.op_constraints.name
+ )
+ ));
+ }
+ }
+ Some(FloType::Source) => {
+ // Handled above.
+ }
+ }
+ }
+
// Must be a DAG (excluding `next_tick()` operators).
// TODO(mingwei): Nested loop blocks should count as a single node.
for (loop_id, loop_nodes) in self.flat_graph.loops() {
diff --git a/hydroflow_lang/src/graph/ops/batch.rs b/hydroflow_lang/src/graph/ops/batch.rs
new file mode 100644
index 00000000000..f2fd91df23f
--- /dev/null
+++ b/hydroflow_lang/src/graph/ops/batch.rs
@@ -0,0 +1,82 @@
+use quote::quote_spanned;
+
+use super::{
+ FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
+ RANGE_1,
+};
+
+pub const BATCH: OperatorConstraints = OperatorConstraints {
+ name: "batch",
+ categories: &[OperatorCategory::Fold],
+ hard_range_inn: RANGE_1,
+ soft_range_inn: RANGE_1,
+ hard_range_out: &(0..=1),
+ soft_range_out: &(0..=1),
+ num_args: 0,
+ persistence_args: RANGE_0,
+ type_args: RANGE_0,
+ is_external_input: false,
+ has_singleton_output: true,
+ flo_type: Some(FloType::Windowing),
+ ports_inn: None,
+ ports_out: None,
+ input_delaytype_fn: |_| None,
+ write_fn: |wc @ &WriteContextArgs {
+ root,
+ context,
+ hydroflow,
+ op_span,
+ ident,
+ is_pull,
+ inputs,
+ outputs,
+ singleton_output_ident,
+ ..
+ },
+ _diagnostics| {
+ let write_prologue = quote_spanned! {op_span=>
+ #[allow(clippy::redundant_closure_call)]
+ let #singleton_output_ident = #hydroflow.add_state(
+ ::std::cell::RefCell::new(::std::vec::Vec::new())
+ );
+
+ // TODO(mingwei): Is this needed?
+ // Reset the value to the initializer fn if it is a new tick.
+ #hydroflow.set_state_tick_hook(#singleton_output_ident, move |rcell| { rcell.take(); });
+ };
+
+ let vec_ident = wc.make_ident("vec");
+
+ let write_iterator = if is_pull {
+ // Pull.
+ let input = &inputs[0];
+ quote_spanned! {op_span=>
+ let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
+ *#vec_ident = #input.collect::<::std::vec::Vec<_>>();
+ let #ident = ::std::iter::once(::std::clone::Clone::clone(&*#vec_ident));
+ }
+ } else if let Some(output) = outputs.first() {
+ // Push with output.
+ quote_spanned! {op_span=>
+ let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
+ let #ident = #root::pusherator::inspect::Inspect::new(|item| {
+ ::std::vec::Vec::push(#vec_ident, ::std::clone::Clone::clone(item));
+ }, #output);
+ }
+ } else {
+ // Push with no output.
+ quote_spanned! {op_span=>
+ let mut #vec_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
+ let #ident = #root::pusherator::for_each::ForEach::new(|item| {
+ ::std::vec::Vec::push(#vec_ident, item);
+ });
+ }
+ };
+
+ Ok(OperatorWriteOutput {
+ write_prologue,
+ write_iterator,
+ ..Default::default()
+ })
+ },
+};
diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs
index 660445526be..42e6b304e18 100644
--- a/hydroflow_lang/src/graph/ops/mod.rs
+++ b/hydroflow_lang/src/graph/ops/mod.rs
@@ -246,6 +246,7 @@ declare_ops![
anti_join_multiset::ANTI_JOIN_MULTISET,
assert::ASSERT,
assert_eq::ASSERT_EQ,
+ batch::BATCH,
chain::CHAIN,
cross_join::CROSS_JOIN,
cross_join_multiset::CROSS_JOIN_MULTISET,