From a1042bbcdb878af76a3510767ca42260d34ff3b7 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 6 Sep 2024 15:57:08 -0700 Subject: [PATCH] refactor(hydroflow_plus_deploy): dedup deploy runtime logic (#1442) --- .../hydroflow_plus_deploy/src/deploy.rs | 33 ++++-- .../src/deploy_runtime.rs | 45 ++++--- .../hydroflow_plus_deploy/src/runtime.rs | 111 ++---------------- ..._tests__compute_pi_ir@surface_graph_0.snap | 2 +- ..._tests__compute_pi_ir@surface_graph_1.snap | 2 +- ..._tests__map_reduce_ir@surface_graph_0.snap | 4 +- ..._tests__map_reduce_ir@surface_graph_1.snap | 4 +- 7 files changed, 67 insertions(+), 134 deletions(-) diff --git a/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs b/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs index c9b5d029140..8ed33fcf2f9 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs @@ -14,7 +14,7 @@ use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec}; use hydroflow_plus::lang::graph::HydroflowGraph; use nameof::name_of; use sha2::{Digest, Sha256}; -use stageleft::Quoted; +use stageleft::{Quoted, RuntimeData}; use tokio::sync::RwLock; use super::HydroflowPlusMeta; @@ -50,7 +50,11 @@ impl<'a> Deploy<'a> for HydroDeploy { ) -> (syn::Expr, syn::Expr) { let p1_port = p1_port.port.as_str(); let p2_port = p2_port.port.as_str(); - deploy_o2o(p1_port, p2_port) + deploy_o2o( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + p1_port, + p2_port, + ) } fn o2o_connect( @@ -85,7 +89,11 @@ impl<'a> Deploy<'a> for HydroDeploy { ) -> (syn::Expr, syn::Expr) { let p1_port = p1_port.port.as_str(); let c2_port = c2_port.port.as_str(); - deploy_o2m(p1_port, c2_port) + deploy_o2m( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + p1_port, + c2_port, + ) } fn o2m_connect( @@ -130,7 +138,11 @@ impl<'a> Deploy<'a> for HydroDeploy { ) -> (syn::Expr, syn::Expr) { let c1_port = c1_port.port.as_str(); let p2_port = p2_port.port.as_str(); - deploy_m2o(c1_port, p2_port) + deploy_m2o( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + c1_port, + p2_port, + ) } fn m2o_connect( @@ -171,7 +183,11 @@ impl<'a> Deploy<'a> for HydroDeploy { ) -> (syn::Expr, syn::Expr) { let c1_port = c1_port.port.as_str(); let c2_port = c2_port.port.as_str(); - deploy_m2m(c1_port, c2_port) + deploy_m2m( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + c1_port, + c2_port, + ) } fn m2m_connect( @@ -216,11 +232,14 @@ impl<'a> Deploy<'a> for HydroDeploy { _env: &Self::CompileEnv, of_cluster: usize, ) -> impl Quoted<'a, &'a Vec> + Copy + 'a { - cluster_members(of_cluster) + cluster_members( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + of_cluster, + ) } fn cluster_self_id(_env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a { - cluster_self_id() + cluster_self_id(RuntimeData::new("__hydroflow_plus_trybuild_cli")) } } diff --git a/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs b/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs index 3bb71912ec3..df9149667cd 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs @@ -5,24 +5,27 @@ use stageleft::{q, Quoted, RuntimeData}; use crate::HydroflowPlusMeta; -pub fn cluster_members<'a>(of_cluster: usize) -> impl Quoted<'a, &'a Vec> + Copy + 'a { - let cli: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn cluster_members( + cli: RuntimeData<&DeployPorts>, + of_cluster: usize, +) -> impl Quoted<&Vec> + Copy { q!(cli.meta.clusters.get(&of_cluster).unwrap()) } -pub fn cluster_self_id<'a>() -> impl Quoted<'a, u32> + Copy + 'a { - let cli: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn cluster_self_id( + cli: RuntimeData<&DeployPorts>, +) -> impl Quoted + Copy { q!(cli .meta .cluster_id .expect("Tried to read Cluster ID on a non-cluster node")) } -pub fn deploy_o2o(p1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) { - let env: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn deploy_o2o( + env: RuntimeData<&DeployPorts>, + p1_port: &str, + p2_port: &str, +) -> (syn::Expr, syn::Expr) { ( { q!({ @@ -43,9 +46,11 @@ pub fn deploy_o2o(p1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) { ) } -pub fn deploy_o2m(p1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) { - let env: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn deploy_o2m( + env: RuntimeData<&DeployPorts>, + p1_port: &str, + c2_port: &str, +) -> (syn::Expr, syn::Expr) { ( { q!({ @@ -66,9 +71,11 @@ pub fn deploy_o2m(p1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) { ) } -pub fn deploy_m2o(c1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) { - let env: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn deploy_m2o( + env: RuntimeData<&DeployPorts>, + c1_port: &str, + p2_port: &str, +) -> (syn::Expr, syn::Expr) { ( { q!({ @@ -89,9 +96,11 @@ pub fn deploy_m2o(c1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) { ) } -pub fn deploy_m2m(c1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) { - let env: RuntimeData<&DeployPorts> = - RuntimeData::new("__hydroflow_plus_trybuild_cli"); +pub fn deploy_m2m( + env: RuntimeData<&DeployPorts>, + c1_port: &str, + c2_port: &str, +) -> (syn::Expr, syn::Expr) { ( { q!({ diff --git a/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs b/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs index 23fd2ac91e9..b139660ea3d 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs @@ -3,10 +3,8 @@ use std::rc::Rc; use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec}; use hydroflow_plus::lang::graph::HydroflowGraph; -use hydroflow_plus::util::deploy::{ - ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts, -}; -use stageleft::{q, Quoted, RuntimeData}; +use hydroflow_plus::util::deploy::DeployPorts; +use stageleft::{Quoted, RuntimeData}; use super::HydroflowPlusMeta; @@ -53,29 +51,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _p2: &Self::Process, p2_port: &Self::ProcessPort, ) -> (syn::Expr, syn::Expr) { - let env = *env; - ( - { - let port = p1_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::() - .into_sink() - }) - .splice_untyped() - }, - { - let port = p2_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::() - .into_source() - }) - .splice_untyped() - }, - ) + crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str()) } fn o2o_connect( @@ -94,29 +70,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _c2: &Self::Cluster, c2_port: &Self::ClusterPort, ) -> (syn::Expr, syn::Expr) { - let env = *env; - ( - { - let port = p1_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::>() - .into_sink() - }) - .splice_untyped() - }, - { - let port = c2_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::() - .into_source() - }) - .splice_untyped() - }, - ) + crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str()) } fn o2m_connect( @@ -135,29 +89,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _p2: &Self::Process, p2_port: &Self::ProcessPort, ) -> (syn::Expr, syn::Expr) { - let env = *env; - ( - { - let port = c1_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::() - .into_sink() - }) - .splice_untyped() - }, - { - let port = p2_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::>() - .into_source() - }) - .splice_untyped() - }, - ) + crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str()) } fn m2o_connect( @@ -176,29 +108,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _c2: &Self::Cluster, c2_port: &Self::ClusterPort, ) -> (syn::Expr, syn::Expr) { - let env = *env; - ( - { - let port = c1_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::>() - .into_sink() - }) - .splice_untyped() - }, - { - let port = c2_port.as_str(); - - q!({ - env.port(port) - .connect_local_blocking::>() - .into_source() - }) - .splice_untyped() - }, - ) + crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str()) } fn m2m_connect( @@ -214,16 +124,11 @@ impl<'a> Deploy<'a> for DeployRuntime { env: &Self::CompileEnv, of_cluster: usize, ) -> impl Quoted<'a, &'a Vec> + Copy + 'a { - let cli = *env; - q!(cli.meta.clusters.get(&of_cluster).unwrap()) + crate::deploy_runtime::cluster_members(*env, of_cluster) } fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a { - let cli = *env; - q!(cli - .meta - .cluster_id - .expect("Tried to read Cluster ID on a non-cluster node")) + crate::deploy_runtime::cluster_self_id(*env) } } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap index e8a0541c5c9..110524ff479 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap @@ -9,7 +9,7 @@ expression: ir.surface_syntax_string() 5v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (f64 , f64) , bool > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (x , y) | x * x + y * y < 1.0 })); 6v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | | (0u64 , 0u64) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , bool , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , sample_inside | { if sample_inside { * inside += 1 ; } * total += 1 ; } })); 7v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (u64 , u64) > (& data) . unwrap () . into () }); -8v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); +8v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c1_port = "port_0" ; let env = FAKE ; { env . port (c1_port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); 1v1 -> 2v1; 2v1 -> 3v1; diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap index 9ae75661a47..97e5b201372 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap @@ -2,7 +2,7 @@ source: hydroflow_plus_test/src/cluster/compute_pi.rs expression: ir.surface_syntax_string() --- -1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); +1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p2_port = "port_0" ; { env . port (p2_port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); 2v1 = map (| res | { let (id , b) = res . unwrap () ; (id , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (u64 , u64) > (& b) . unwrap ()) }); 3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (u64 , u64)) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b })); 4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } })); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap index 3fdae49d605..6bbd7ffaaf9 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap @@ -7,8 +7,8 @@ expression: ir.surface_syntax_string() 3v1 = enumerate (); 4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (u32 , std :: string :: String) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; let all_ids_vec = __hydroflow_plus_cluster_ids_1 ; | (i , w) | ((i % all_ids_vec . len ()) as u32 , w) })); 5v1 = map (| (id , data) | { (id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) }); -6v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } }); -7v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_1" ; { env . port (port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); +6v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p1_port = "port_0" ; { env . port (p1_port) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } }); +7v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p2_port = "port_1" ; { env . port (p2_port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); 8v1 = map (| res | { let (id , b) = res . unwrap () ; (id , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (std :: string :: String , i32) > (& b) . unwrap ()) }); 9v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: string :: String , i32)) , (std :: string :: String , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b })); 10v1 = reduce_keyed :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | total , count | * total += count })); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap index af35b63b0c0..2449f07876e 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap @@ -2,13 +2,13 @@ source: hydroflow_plus_test/src/cluster/map_reduce.rs expression: ir.surface_syntax_string() --- -1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_source () } }); +1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c2_port = "port_0" ; let env = FAKE ; { env . port (c2_port) . connect_local_blocking :: < ConnectedDirect > () . into_source () } }); 2v1 = map (| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }); 3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < std :: string :: String , (std :: string :: String , ()) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | string | (string , ()) })); 4v1 = fold_keyed :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | | 0 }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , () , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | count , _ | * count += 1 })); 5v1 = inspect (stageleft :: runtime_support :: fn1_borrow_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | (string , count) | println ! ("partition count: {} - {}" , string , count) })); 6v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (std :: string :: String , i32) > (& data) . unwrap () . into () }); -7v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_1" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); +7v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c1_port = "port_1" ; let env = FAKE ; { env . port (c1_port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); 1v1 -> 2v1; 2v1 -> 3v1;