Skip to content

Commit

Permalink
feat(hydroflow_plus): use an IR before lowering to Hydroflow (#1070)
Browse files Browse the repository at this point in the history
Makes it possible to write custom optimization passes.
  • Loading branch information
shadaj authored Feb 28, 2024
1 parent c8d6985 commit eb34ccd
Show file tree
Hide file tree
Showing 57 changed files with 2,316 additions and 1,592 deletions.
57 changes: 33 additions & 24 deletions hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use hydro_deploy::hydroflow_crate::ports::{
};
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{Deployment, Host};
use hydroflow_plus::builder::Builders;
use hydroflow_plus::ir::HfPlusLeaf;
use hydroflow_plus::location::{
Cluster, ClusterSpec, Deploy, HfSendManyToMany, HfSendManyToOne, HfSendOneToMany,
HfSendOneToOne, Location, ProcessSpec,
Expand Down Expand Up @@ -70,6 +70,7 @@ pub trait DeployCrateWrapper {
pub struct DeployNode<'a> {
id: usize,
builder: &'a FlowBuilder<'a, HydroDeploy>,
cycle_counter: Rc<RefCell<usize>>,
next_port: Rc<RefCell<usize>>,
underlying: Arc<RwLock<HydroflowCrateService>>,
}
Expand Down Expand Up @@ -118,8 +119,12 @@ impl<'a> Location<'a> for DeployNode<'a> {
self.id
}

fn flow_builder(&self) -> (&'a RefCell<usize>, &'a Builders) {
self.builder.builder_components()
fn ir_leaves(&self) -> &'a RefCell<Vec<HfPlusLeaf>> {
self.builder.ir_leaves()
}

fn cycle_counter(&self) -> &RefCell<usize> {
self.cycle_counter.as_ref()
}

fn next_port(&self) -> DeployPort<Self> {
Expand Down Expand Up @@ -156,6 +161,7 @@ impl DeployCrateWrapper for DeployClusterNode {
pub struct DeployCluster<'a> {
id: usize,
builder: &'a FlowBuilder<'a, HydroDeploy>,
cycle_counter: Rc<RefCell<usize>>,
next_port: Rc<RefCell<usize>>,
pub members: Vec<DeployClusterNode>,
}
Expand All @@ -168,8 +174,12 @@ impl<'a> Location<'a> for DeployCluster<'a> {
self.id
}

fn flow_builder(&self) -> (&'a RefCell<usize>, &'a Builders) {
self.builder.builder_components()
fn ir_leaves(&self) -> &'a RefCell<Vec<HfPlusLeaf>> {
self.builder.ir_leaves()
}

fn cycle_counter(&self) -> &RefCell<usize> {
self.cycle_counter.as_ref()
}

fn next_port(&self) -> DeployPort<Self> {
Expand Down Expand Up @@ -223,15 +233,12 @@ impl<'a> HfSendOneToOne<'a, DeployNode<'a>> for DeployNode<'a> {
source_port.send_to(&mut recipient_port);
}

fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr {
parse_quote!(null)
}

fn gen_source_statement(
_other: &DeployNode<'a>,
_port: &Self::Port,
) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
fn gen_source_statement(_other: &DeployNode<'a>, _port: &Self::Port) -> syn::Expr {
parse_quote!(null)
}
}

Expand Down Expand Up @@ -264,15 +271,15 @@ impl<'a> HfSendManyToOne<'a, DeployNode<'a>> for DeployCluster<'a> {
}
}

fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr {
parse_quote!(null)
}

fn gen_source_statement(
_other: &DeployNode<'a>,
_port: &DeployPort<DeployNode<'a>>,
) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
) -> syn::Expr {
parse_quote!(null)
}
}

Expand Down Expand Up @@ -309,15 +316,15 @@ impl<'a> HfSendOneToMany<'a, DeployCluster<'a>> for DeployNode<'a> {
source_port.send_to(&mut recipient_port);
}

fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr {
parse_quote!(null)
}

fn gen_source_statement(
_other: &DeployCluster<'a>,
_port: &DeployPort<DeployCluster<'a>>,
) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
) -> syn::Expr {
parse_quote!(null)
}
}

Expand Down Expand Up @@ -362,15 +369,15 @@ impl<'a> HfSendManyToMany<'a, DeployCluster<'a>> for DeployCluster<'a> {
}
}

fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr {
parse_quote!(null)
}

fn gen_source_statement(
_other: &DeployCluster<'a>,
_port: &DeployPort<DeployCluster<'a>>,
) -> hydroflow_plus::lang::parse::Pipeline {
parse_quote!(null())
) -> syn::Expr {
parse_quote!(null)
}
}

Expand All @@ -394,6 +401,7 @@ impl<'a: 'b, 'b> ProcessSpec<'a, HydroDeploy> for DeployProcessSpec<'b> {
DeployNode {
id,
builder,
cycle_counter: Rc::new(RefCell::new(0)),
next_port: Rc::new(RefCell::new(0)),
underlying: (self.0.borrow_mut())(),
}
Expand Down Expand Up @@ -423,6 +431,7 @@ impl<'a: 'b, 'b> ClusterSpec<'a, HydroDeploy> for DeployClusterSpec<'b> {
DeployCluster {
id,
builder,
cycle_counter: Rc::new(RefCell::new(0)),
next_port: Rc::new(RefCell::new(0)),
members: cluster_nodes
.into_iter()
Expand Down
87 changes: 41 additions & 46 deletions hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::RefCell;
use std::rc::Rc;

use hydroflow_plus::lang::parse::Pipeline;
use hydroflow_plus::ir::HfPlusLeaf;
use hydroflow_plus::location::{
Cluster, ClusterSpec, Deploy, HfSendManyToMany, HfSendManyToOne, HfSendOneToMany,
HfSendOneToOne, Location, ProcessSpec,
Expand All @@ -11,7 +11,6 @@ use hydroflow_plus::util::cli::{
};
use hydroflow_plus::FlowBuilder;
use stageleft::{q, Quoted, RuntimeData};
use syn::parse_quote;

use super::HydroflowPlusMeta;

Expand All @@ -30,6 +29,7 @@ impl<'a> Deploy<'a> for CLIRuntime {
pub struct CLIRuntimeNode<'a> {
id: usize,
builder: &'a FlowBuilder<'a, CLIRuntime>,
cycle_counter: Rc<RefCell<usize>>,
next_port: Rc<RefCell<usize>>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
}
Expand All @@ -42,8 +42,12 @@ impl<'a> Location<'a> for CLIRuntimeNode<'a> {
self.id
}

fn flow_builder(&self) -> (&'a RefCell<usize>, &'a hydroflow_plus::builder::Builders) {
self.builder.builder_components()
fn ir_leaves(&self) -> &'a RefCell<Vec<HfPlusLeaf>> {
self.builder.ir_leaves()
}

fn cycle_counter(&self) -> &RefCell<usize> {
self.cycle_counter.as_ref()
}

fn next_port(&self) -> String {
Expand All @@ -59,6 +63,7 @@ impl<'a> Location<'a> for CLIRuntimeNode<'a> {
pub struct CLIRuntimeCluster<'a> {
id: usize,
builder: &'a FlowBuilder<'a, CLIRuntime>,
cycle_counter: Rc<RefCell<usize>>,
next_port: Rc<RefCell<usize>>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
}
Expand All @@ -71,8 +76,12 @@ impl<'a> Location<'a> for CLIRuntimeCluster<'a> {
self.id
}

fn flow_builder(&self) -> (&'a RefCell<usize>, &'a hydroflow_plus::builder::Builders) {
self.builder.builder_components()
fn ir_leaves(&self) -> &'a RefCell<Vec<HfPlusLeaf>> {
self.builder.ir_leaves()
}

fn cycle_counter(&self) -> &RefCell<usize> {
self.cycle_counter.as_ref()
}

fn next_port(&self) -> String {
Expand All @@ -95,134 +104,118 @@ impl<'a> Cluster<'a> for CLIRuntimeCluster<'a> {
impl<'a> HfSendOneToOne<'a, CLIRuntimeNode<'a>> for CLIRuntimeNode<'a> {
fn connect(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {}

fn gen_sink_statement(&self, port: &String) -> Pipeline {
fn gen_sink_statement(&self, port: &String) -> syn::Expr {
let self_cli = self.cli;
let port = port.as_str();
let sink_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice();

parse_quote!(dest_sink(#sink_quote))
.splice()
}

fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> Pipeline {
fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> syn::Expr {
let self_cli = other.cli;
let port = port.as_str();
let source_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice();

parse_quote!(source_stream(#source_quote))
.splice()
}
}

impl<'a> HfSendManyToOne<'a, CLIRuntimeNode<'a>> for CLIRuntimeCluster<'a> {
fn connect(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {}

fn gen_sink_statement(&self, port: &String) -> Pipeline {
fn gen_sink_statement(&self, port: &String) -> syn::Expr {
let self_cli = self.cli;
let port = port.as_str();
let sink_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice();

parse_quote!(dest_sink(#sink_quote))
.splice()
}

fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> Pipeline {
fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> syn::Expr {
let self_cli = other.cli;
let port = port.as_str();
let source_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice();

parse_quote!(source_stream(#source_quote))
.splice()
}
}

impl<'a> HfSendOneToMany<'a, CLIRuntimeCluster<'a>> for CLIRuntimeNode<'a> {
fn connect(&self, _other: &CLIRuntimeCluster, _source_port: &String, _recipient_port: &String) {
}

fn gen_sink_statement(&self, port: &String) -> Pipeline {
fn gen_sink_statement(&self, port: &String) -> syn::Expr {
let self_cli = self.cli;
let port = port.as_str();

let sink_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice();

parse_quote!(dest_sink(#sink_quote))
.splice()
}

fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> Pipeline {
fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> syn::Expr {
let self_cli = other.cli;
let port = port.as_str();

let source_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice();

parse_quote!(source_stream(#source_quote))
.splice()
}
}

impl<'a> HfSendManyToMany<'a, CLIRuntimeCluster<'a>> for CLIRuntimeCluster<'a> {
fn connect(&self, _other: &CLIRuntimeCluster, _source_port: &String, _recipient_port: &String) {
}

fn gen_sink_statement(&self, port: &String) -> Pipeline {
fn gen_sink_statement(&self, port: &String) -> syn::Expr {
let self_cli = self.cli;
let port = port.as_str();

let sink_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice();

parse_quote!(dest_sink(#sink_quote))
.splice()
}

fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> Pipeline {
fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> syn::Expr {
let self_cli = other.cli;
let port = port.as_str();

let source_quote = q!({
q!({
self_cli
.port(port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice();

parse_quote!(source_stream(#source_quote))
.splice()
}
}

Expand All @@ -236,6 +229,7 @@ impl<'cli> ProcessSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<Hydroflo
CLIRuntimeNode {
id,
builder,
cycle_counter: Rc::new(RefCell::new(0)),
next_port: Rc::new(RefCell::new(0)),
cli: *self,
}
Expand All @@ -252,6 +246,7 @@ impl<'cli> ClusterSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<Hydroflo
CLIRuntimeCluster {
id,
builder,
cycle_counter: Rc::new(RefCell::new(0)),
next_port: Rc::new(RefCell::new(0)),
cli: *self,
}
Expand Down
Loading

0 comments on commit eb34ccd

Please sign in to comment.