diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 4006bc3b5df9..567bdc34ed57 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -311,7 +311,10 @@ pub enum HfPlusNode { }, DeferTick(Box), - Enumerate(Box), + Enumerate { + is_static: bool, + input: Box, + }, Inspect { f: DebugExpr, input: Box, @@ -500,7 +503,7 @@ impl<'a> HfPlusNode { HfPlusNode::DeferTick(input) => { transform(input.as_mut(), seen_tees); } - HfPlusNode::Enumerate(input) => { + HfPlusNode::Enumerate { input, .. } => { transform(input.as_mut(), seen_tees); } HfPlusNode::Inspect { input, .. } => { @@ -987,7 +990,7 @@ impl<'a> HfPlusNode { (defer_tick_ident, input_location_id) } - HfPlusNode::Enumerate(input) => { + HfPlusNode::Enumerate { is_static, input } => { let (input_ident, input_location_id) = input.emit(graph_builders, built_tees, next_stmt_id); @@ -998,9 +1001,16 @@ impl<'a> HfPlusNode { syn::Ident::new(&format!("stream_{}", enumerate_id), Span::call_site()); let builder = graph_builders.entry(input_location_id).or_default(); - builder.add_statement(parse_quote! { - #enumerate_ident = #input_ident -> enumerate(); - }); + + if *is_static { + builder.add_statement(parse_quote! { + #enumerate_ident = #input_ident -> enumerate::<'static>(); + }); + } else { + builder.add_statement(parse_quote! { + #enumerate_ident = #input_ident -> enumerate::<'tick>(); + }); + } (enumerate_ident, input_location_id) } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 6dcaf3eed927..550afa52a37b 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -261,10 +261,23 @@ impl<'a, T, L: Location<'a>, B> Stream { } pub fn enumerate(self) -> Stream<(usize, T), L, B> { - Stream::new( - self.location, - HfPlusNode::Enumerate(Box::new(self.ir_node.into_inner())), - ) + if L::is_top_level() { + Stream::new( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::Enumerate { + is_static: true, + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + })), + ) + } else { + Stream::new( + self.location, + HfPlusNode::Enumerate { + is_static: false, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } } pub fn unique(self) -> Stream @@ -789,6 +802,21 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { .send_bincode(other) } + pub fn round_robin_bincode( + self, + other: &Cluster<'a, C2>, + ) -> Stream, Cluster<'a, C2>, Unbounded> + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + T: Clone + Serialize + DeserializeOwned, + { + let ids = other.members(); + + self.enumerate() + .map(q!(|(i, w)| (ids[i % ids.len()], w))) + .send_bincode(other) + } + pub fn broadcast_bincode_interleaved( self, other: &Cluster<'a, C2>, @@ -800,6 +828,17 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self.broadcast_bincode(other).map(q!(|(_, b)| b)) } + pub fn round_robin_bincode_interleaved( + self, + other: &Cluster<'a, C2>, + ) -> Stream, Unbounded> + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + T: Clone + Serialize + DeserializeOwned, + { + self.round_robin_bincode(other).map(q!(|(_, b)| b)) + } + pub fn broadcast_bytes( self, other: &Cluster<'a, C2>, @@ -817,6 +856,21 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { .send_bytes(other) } + pub fn round_robin_bytes( + self, + other: &Cluster<'a, C2>, + ) -> Stream, Cluster<'a, C2>, Unbounded> + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + T: Clone, + { + let ids = other.members(); + + self.enumerate() + .map(q!(|(i, w)| (ids[i % ids.len()], w))) + .send_bytes(other) + } + pub fn broadcast_bytes_interleaved( self, other: &Cluster<'a, C2>, @@ -828,6 +882,18 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { { self.broadcast_bytes(other).map(q!(|(_, b)| b)) } + + pub fn round_robin_bytes_interleaved( + self, + other: &Cluster<'a, C2>, + ) -> Stream, Unbounded> + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + + 'a, + T: Clone, + { + self.round_robin_bytes(other).map(q!(|(_, b)| b)) + } } #[cfg(test)] diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index 27ebdc7eb598..97173bf45578 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -11,18 +11,8 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' .source_iter(q!(vec!["abc", "abc", "xyz", "abc"])) .map(q!(|s| s.to_string())); - let all_ids_vec = cluster.members(); - let words_partitioned = words - .tick_batch(&process.tick()) - .enumerate() - .map(q!(|(i, w)| ( - ClusterId::from_raw((i % all_ids_vec.len()) as u32), - w - ))) - .all_ticks(); - - words_partitioned - .send_bincode(&cluster) + words + .round_robin_bincode(&cluster) .map(q!(|string| (string, ()))) .tick_batch(&cluster.tick()) .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap index 5daa117e821c..86e08bc03193 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap @@ -78,9 +78,10 @@ expression: built.ir() ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; let all_ids_vec = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ClusterId :: from_raw ((i % all_ids_vec . len ()) as u32) , w) }), - input: Enumerate( - Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ids [i % ids . len ()] , w) }), + input: Enumerate { + is_static: true, + input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < & str , std :: string :: String > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | s | s . to_string () }), input: Source { source: Iter( @@ -91,7 +92,7 @@ expression: built.ir() ), }, }, - ), + }, }, }, }, diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap index a45a39badd06..329c9c167fe9 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap @@ -4,8 +4,8 @@ expression: ir.surface_syntax_string() --- 1v1 = source_iter ({ use crate :: __staged :: cluster :: map_reduce :: * ; vec ! ["abc" , "abc" , "xyz" , "abc"] }); 2v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < & str , std :: string :: String > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | s | s . to_string () })); -3v1 = enumerate (); -4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; let all_ids_vec = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ClusterId :: from_raw ((i % all_ids_vec . len ()) as u32) , w) })); +3v1 = enumerate :: < 'static > (); +4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ids [i % ids . len ()] , w) })); 5v1 = map (| (id , data) : (hydroflow_plus :: ClusterId < _ > , std :: string :: String) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) }); 6v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let env = FAKE ; let p1_port = "port_0" ; { env . port (p1_port) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } }); 7v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let env = FAKE ; let p2_port = "port_1" ; { env . port (p2_port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 4bd55606df93..5a9d311e2063 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -558,8 +558,9 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , usize) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index , value : Some (payload) } }), input: CrossSingleton( CrossSingleton( - Enumerate( - Map { + Enumerate { + is_static: false, + input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -601,7 +602,7 @@ expression: built.ir() }, }, }, - ), + }, Tee { inner: , },