Skip to content

Commit

Permalink
refactor(hydroflow_plus_deploy): dedup deploy runtime logic (#1442)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Sep 6, 2024
1 parent d567760 commit a1042bb
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 134 deletions.
33 changes: 26 additions & 7 deletions hydro_deploy/hydroflow_plus_deploy/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec};
use hydroflow_plus::lang::graph::HydroflowGraph;
use nameof::name_of;
use sha2::{Digest, Sha256};
use stageleft::Quoted;
use stageleft::{Quoted, RuntimeData};
use tokio::sync::RwLock;

use super::HydroflowPlusMeta;
Expand Down Expand Up @@ -50,7 +50,11 @@ impl<'a> Deploy<'a> for HydroDeploy {
) -> (syn::Expr, syn::Expr) {
let p1_port = p1_port.port.as_str();
let p2_port = p2_port.port.as_str();
deploy_o2o(p1_port, p2_port)
deploy_o2o(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
p1_port,
p2_port,
)
}

fn o2o_connect(
Expand Down Expand Up @@ -85,7 +89,11 @@ impl<'a> Deploy<'a> for HydroDeploy {
) -> (syn::Expr, syn::Expr) {
let p1_port = p1_port.port.as_str();
let c2_port = c2_port.port.as_str();
deploy_o2m(p1_port, c2_port)
deploy_o2m(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
p1_port,
c2_port,
)
}

fn o2m_connect(
Expand Down Expand Up @@ -130,7 +138,11 @@ impl<'a> Deploy<'a> for HydroDeploy {
) -> (syn::Expr, syn::Expr) {
let c1_port = c1_port.port.as_str();
let p2_port = p2_port.port.as_str();
deploy_m2o(c1_port, p2_port)
deploy_m2o(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
c1_port,
p2_port,
)
}

fn m2o_connect(
Expand Down Expand Up @@ -171,7 +183,11 @@ impl<'a> Deploy<'a> for HydroDeploy {
) -> (syn::Expr, syn::Expr) {
let c1_port = c1_port.port.as_str();
let c2_port = c2_port.port.as_str();
deploy_m2m(c1_port, c2_port)
deploy_m2m(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
c1_port,
c2_port,
)
}

fn m2m_connect(
Expand Down Expand Up @@ -216,11 +232,14 @@ impl<'a> Deploy<'a> for HydroDeploy {
_env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
cluster_members(of_cluster)
cluster_members(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
of_cluster,
)
}

fn cluster_self_id(_env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
cluster_self_id()
cluster_self_id(RuntimeData::new("__hydroflow_plus_trybuild_cli"))
}
}

Expand Down
45 changes: 27 additions & 18 deletions hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ use stageleft::{q, Quoted, RuntimeData};

use crate::HydroflowPlusMeta;

pub fn cluster_members<'a>(of_cluster: usize) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
let cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
of_cluster: usize,
) -> impl Quoted<&Vec<u32>> + Copy {
q!(cli.meta.clusters.get(&of_cluster).unwrap())
}

pub fn cluster_self_id<'a>() -> impl Quoted<'a, u32> + Copy + 'a {
let cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn cluster_self_id(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
) -> impl Quoted<u32> + Copy {
q!(cli
.meta
.cluster_id
.expect("Tried to read Cluster ID on a non-cluster node"))
}

pub fn deploy_o2o(p1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) {
let env: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn deploy_o2o(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
p1_port: &str,
p2_port: &str,
) -> (syn::Expr, syn::Expr) {
(
{
q!({
Expand All @@ -43,9 +46,11 @@ pub fn deploy_o2o(p1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) {
)
}

pub fn deploy_o2m(p1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) {
let env: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn deploy_o2m(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
p1_port: &str,
c2_port: &str,
) -> (syn::Expr, syn::Expr) {
(
{
q!({
Expand All @@ -66,9 +71,11 @@ pub fn deploy_o2m(p1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) {
)
}

pub fn deploy_m2o(c1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) {
let env: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn deploy_m2o(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
c1_port: &str,
p2_port: &str,
) -> (syn::Expr, syn::Expr) {
(
{
q!({
Expand All @@ -89,9 +96,11 @@ pub fn deploy_m2o(c1_port: &str, p2_port: &str) -> (syn::Expr, syn::Expr) {
)
}

pub fn deploy_m2m(c1_port: &str, c2_port: &str) -> (syn::Expr, syn::Expr) {
let env: RuntimeData<&DeployPorts<HydroflowPlusMeta>> =
RuntimeData::new("__hydroflow_plus_trybuild_cli");
pub fn deploy_m2m(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
c1_port: &str,
c2_port: &str,
) -> (syn::Expr, syn::Expr) {
(
{
q!({
Expand Down
111 changes: 8 additions & 103 deletions hydro_deploy/hydroflow_plus_deploy/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::rc::Rc;

use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec};
use hydroflow_plus::lang::graph::HydroflowGraph;
use hydroflow_plus::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use stageleft::{q, Quoted, RuntimeData};
use hydroflow_plus::util::deploy::DeployPorts;
use stageleft::{Quoted, RuntimeData};

use super::HydroflowPlusMeta;

Expand Down Expand Up @@ -53,29 +51,7 @@ impl<'a> Deploy<'a> for DeployRuntime {
_p2: &Self::Process,
p2_port: &Self::ProcessPort,
) -> (syn::Expr, syn::Expr) {
let env = *env;
(
{
let port = p1_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
},
{
let port = p2_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
},
)
crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str())
}

fn o2o_connect(
Expand All @@ -94,29 +70,7 @@ impl<'a> Deploy<'a> for DeployRuntime {
_c2: &Self::Cluster,
c2_port: &Self::ClusterPort,
) -> (syn::Expr, syn::Expr) {
let env = *env;
(
{
let port = p1_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
},
{
let port = c2_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
},
)
crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str())
}

fn o2m_connect(
Expand All @@ -135,29 +89,7 @@ impl<'a> Deploy<'a> for DeployRuntime {
_p2: &Self::Process,
p2_port: &Self::ProcessPort,
) -> (syn::Expr, syn::Expr) {
let env = *env;
(
{
let port = c1_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
},
{
let port = p2_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
},
)
crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str())
}

fn m2o_connect(
Expand All @@ -176,29 +108,7 @@ impl<'a> Deploy<'a> for DeployRuntime {
_c2: &Self::Cluster,
c2_port: &Self::ClusterPort,
) -> (syn::Expr, syn::Expr) {
let env = *env;
(
{
let port = c1_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
},
{
let port = c2_port.as_str();

q!({
env.port(port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
},
)
crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str())
}

fn m2m_connect(
Expand All @@ -214,16 +124,11 @@ impl<'a> Deploy<'a> for DeployRuntime {
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
let cli = *env;
q!(cli.meta.clusters.get(&of_cluster).unwrap())
crate::deploy_runtime::cluster_members(*env, of_cluster)
}

fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
let cli = *env;
q!(cli
.meta
.cluster_id
.expect("Tried to read Cluster ID on a non-cluster node"))
crate::deploy_runtime::cluster_self_id(*env)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ expression: ir.surface_syntax_string()
5v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (f64 , f64) , bool > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (x , y) | x * x + y * y < 1.0 }));
6v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | | (0u64 , 0u64) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , bool , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , sample_inside | { if sample_inside { * inside += 1 ; } * total += 1 ; } }));
7v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (u64 , u64) > (& data) . unwrap () . into () });
8v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } });
8v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c1_port = "port_0" ; let env = FAKE ; { env . port (c1_port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } });

1v1 -> 2v1;
2v1 -> 3v1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
source: hydroflow_plus_test/src/cluster/compute_pi.rs
expression: ir.surface_syntax_string()
---
1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } });
1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p2_port = "port_0" ; { env . port (p2_port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } });
2v1 = map (| res | { let (id , b) = res . unwrap () ; (id , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (u64 , u64) > (& b) . unwrap ()) });
3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (u64 , u64)) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }));
4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ expression: ir.surface_syntax_string()
3v1 = enumerate ();
4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (u32 , std :: string :: String) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; let all_ids_vec = __hydroflow_plus_cluster_ids_1 ; | (i , w) | ((i % all_ids_vec . len ()) as u32 , w) }));
5v1 = map (| (id , data) | { (id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) });
6v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } });
7v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_1" ; { env . port (port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } });
6v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p1_port = "port_0" ; { env . port (p1_port) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } });
7v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let env = FAKE ; let p2_port = "port_1" ; { env . port (p2_port) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } });
8v1 = map (| res | { let (id , b) = res . unwrap () ; (id , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (std :: string :: String , i32) > (& b) . unwrap ()) });
9v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: string :: String , i32)) , (std :: string :: String , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }));
10v1 = reduce_keyed :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | total , count | * total += count }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
source: hydroflow_plus_test/src/cluster/map_reduce.rs
expression: ir.surface_syntax_string()
---
1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_0" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_source () } });
1v1 = source_stream ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c2_port = "port_0" ; let env = FAKE ; { env . port (c2_port) . connect_local_blocking :: < ConnectedDirect > () . into_source () } });
2v1 = map (| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () });
3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < std :: string :: String , (std :: string :: String , ()) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | string | (string , ()) }));
4v1 = fold_keyed :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | | 0 }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , () , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | count , _ | * count += 1 }));
5v1 = inspect (stageleft :: runtime_support :: fn1_borrow_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | (string , count) | println ! ("partition count: {} - {}" , string , count) }));
6v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (std :: string :: String , i32) > (& data) . unwrap () . into () });
7v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: runtime :: * ; let env = FAKE ; let port = "port_1" ; { env . port (port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } });
7v1 = dest_sink ({ use hydroflow_plus_deploy :: __staged :: deploy_runtime :: * ; let c1_port = "port_1" ; let env = FAKE ; { env . port (c1_port) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } });

1v1 -> 2v1;
2v1 -> 3v1;
Expand Down

0 comments on commit a1042bb

Please sign in to comment.