Skip to content

Commit

Permalink
feat(hydroflow_plus): implement support for external network outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 14, 2024
1 parent ae1b6a7 commit 26d8d12
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 41 deletions.
29 changes: 26 additions & 3 deletions hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::marker::PhantomData;
use std::pin::Pin;

use hydroflow::bytes::Bytes;
use hydroflow::futures::Sink;
use hydroflow::futures::{Sink, Stream};
use proc_macro2::Span;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -14,7 +14,8 @@ use super::built::build_inner;
use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort};
use crate::ir::HfPlusLeaf;
use crate::location::{
ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId,
ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location,
LocationId,
};
use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec};

Expand Down Expand Up @@ -261,12 +262,34 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {

pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static>(
&self,
port: ExternalBincodePort<T>,
port: ExternalBincodeSink<T>,
) -> Pin<Box<dyn Sink<T, Error = Error>>> {
self.externals
.get(&port.process_id)
.unwrap()
.as_bincode_sink(port.port_id)
.await
}

pub async fn connect_source_bytes(
&self,
port: ExternalBytesPort,
) -> Pin<Box<dyn Stream<Item = Bytes>>> {
self.externals
.get(&port.process_id)
.unwrap()
.as_bytes_source(port.port_id)
.await
}

pub async fn connect_source_bincode<T: Serialize + DeserializeOwned + 'static>(
&self,
port: ExternalBincodeStream<T>,
) -> Pin<Box<dyn Stream<Item = T>>> {
self.externals
.get(&port.process_id)
.unwrap()
.as_bincode_source(port.port_id)
.await
}
}
28 changes: 14 additions & 14 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use stageleft::*;
use crate::cycle::{CycleCollection, CycleCollectionWithInitial};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{
Cluster, ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId, Process,
Cluster, ExternalBincodeSink, ExternalBytesPort, ExternalProcess, Location, LocationId, Process,
};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::{HfCycle, Optional, RuntimeContext, Singleton, Stream};
Expand All @@ -29,7 +29,9 @@ pub mod deploy;
/// Tracks the leaves of the dataflow IR. This is referenced by
/// `Stream` and `HfCycle` to build the IR. The inner option will
/// be set to `None` when this builder is finalized.
pub type FlowLeaves<'a> = Rc<RefCell<Option<Vec<HfPlusLeaf<'a>>>>>;
pub type FlowLeaves<'a> = Rc<RefCell<(Option<Vec<HfPlusLeaf<'a>>>, usize)>>;

pub type ExternalPortCounter = Rc<RefCell<usize>>;

#[derive(Copy, Clone)]
pub struct ClusterIds<'a> {
Expand Down Expand Up @@ -80,7 +82,6 @@ pub struct FlowBuilder<'a> {
cycle_ids: RefCell<HashMap<usize, usize>>,

next_node_id: RefCell<usize>,
next_external_port_id: RefCell<usize>,

/// Tracks whether this flow has been finalized; it is an error to
/// drop without finalizing.
Expand Down Expand Up @@ -114,12 +115,11 @@ impl<'a> FlowBuilder<'a> {
)]
pub fn new() -> FlowBuilder<'a> {
FlowBuilder {
ir_leaves: Rc::new(RefCell::new(Some(Vec::new()))),
ir_leaves: Rc::new(RefCell::new((Some(Vec::new()), 0))),
nodes: RefCell::new(vec![]),
clusters: RefCell::new(vec![]),
cycle_ids: RefCell::new(HashMap::new()),
next_node_id: RefCell::new(0),
next_external_port_id: RefCell::new(0),
finalized: false,
_phantom: PhantomData,
}
Expand All @@ -129,7 +129,7 @@ impl<'a> FlowBuilder<'a> {
self.finalized = true;

built::BuiltFlow {
ir: self.ir_leaves.borrow_mut().take().unwrap(),
ir: self.ir_leaves.borrow_mut().0.take().unwrap(),
processes: self.nodes.replace(vec![]),
clusters: self.clusters.replace(vec![]),
used: false,
Expand Down Expand Up @@ -242,9 +242,9 @@ impl<'a> FlowBuilder<'a> {
to: &L,
) -> (ExternalBytesPort, Stream<'a, Bytes, Unbounded, NoTick, L>) {
let next_external_port_id = {
let mut next_external_port_id = self.next_external_port_id.borrow_mut();
let id = *next_external_port_id;
*next_external_port_id += 1;
let mut ir_leaves = self.ir_leaves.borrow_mut();
let id = ir_leaves.1;
ir_leaves.1 += 1;
id
};

Expand Down Expand Up @@ -277,16 +277,16 @@ impl<'a> FlowBuilder<'a> {
&self,
from: &ExternalProcess<P>,
to: &L,
) -> (ExternalBincodePort<T>, Stream<'a, T, Unbounded, NoTick, L>) {
) -> (ExternalBincodeSink<T>, Stream<'a, T, Unbounded, NoTick, L>) {
let next_external_port_id = {
let mut next_external_port_id = self.next_external_port_id.borrow_mut();
let id = *next_external_port_id;
*next_external_port_id += 1;
let mut ir_leaves = self.ir_leaves.borrow_mut();
let id = ir_leaves.1;
ir_leaves.1 += 1;
id
};

(
ExternalBincodePort {
ExternalBincodeSink {
process_id: from.id,
port_id: next_external_port_id,
_phantom: PhantomData,
Expand Down
73 changes: 72 additions & 1 deletion hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use hydro_deploy::hydroflow_crate::ports::{
use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow::futures::StreamExt;
use hydroflow::util::deploy::ConnectedSource;
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use stageleft::{Quoted, RuntimeData};
Expand Down Expand Up @@ -290,6 +293,49 @@ impl<'a> Deploy<'a> for HydroDeploy {
.insert(p1_port.clone(), source_port);
}

fn o2e_sink(
_compile_env: &Self::CompileEnv,
_p1: &Self::Process,
p1_port: &Self::Port,
_p2: &Self::ExternalProcess,
p2_port: &Self::Port,
) -> syn::Expr {
let p1_port = p1_port.as_str();
let p2_port = p2_port.as_str();
deploy_o2e(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
p1_port,
p2_port,
)
}

fn o2e_connect(
p1: &Self::Process,
p1_port: &Self::Port,
p2: &Self::ExternalProcess,
p2_port: &Self::Port,
) {
let self_underlying_borrow = p1.underlying.borrow();
let self_underlying = self_underlying_borrow.as_ref().unwrap();
let source_port = self_underlying
.try_read()
.unwrap()
.get_port(p1_port.clone(), self_underlying);

let other_underlying_borrow = p2.underlying.borrow();
let other_underlying = other_underlying_borrow.as_ref().unwrap();
let recipient_port = other_underlying
.try_read()
.unwrap()
.declare_client(other_underlying);

source_port.send_to(&recipient_port);

p2.client_ports
.borrow_mut()
.insert(p2_port.clone(), recipient_port);
}

fn cluster_ids(
_env: &Self::CompileEnv,
of_cluster: usize,
Expand Down Expand Up @@ -426,7 +472,7 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
let port = self.raw_port(key);
async move {
let sink = port.connect().await.into_sink();
Box::pin(sink) as Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>
sink as Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>
}
}

Expand All @@ -441,6 +487,31 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
as Pin<Box<dyn crate::futures::Sink<T, Error = Error>>>
}
}

fn as_bytes_source(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Stream<Item = crate::bytes::Bytes>>>> + 'a
{
let port = self.raw_port(key);
async move {
let source = port.connect().await.into_source();
Box::pin(source.map(|r| r.unwrap().freeze()))
as Pin<Box<dyn crate::futures::Stream<Item = crate::bytes::Bytes>>>
}
}

fn as_bincode_source<T: DeserializeOwned + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Stream<Item = T>>>> + 'a {
let port = self.raw_port(key);
async move {
let source = port.connect().await.into_source();
Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
as Pin<Box<dyn crate::futures::Stream<Item = T>>>
}
}
}

impl Node for DeployExternal {
Expand Down
15 changes: 14 additions & 1 deletion hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn deploy_m2m(

pub fn deploy_e2o(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
c1_port: &str,
e1_port: &str,
p2_port: &str,
) -> syn::Expr {
q!({
Expand All @@ -133,3 +133,16 @@ pub fn deploy_e2o(
})
.splice_untyped()
}

pub fn deploy_o2e(
env: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
p1_port: &str,
e2_port: &str,
) -> syn::Expr {
q!({
env.port(p1_port)
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
}
44 changes: 44 additions & 0 deletions hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ impl<'a> Deploy<'a> for DeployRuntime {
panic!()
}

fn o2e_sink(
_compile_env: &Self::CompileEnv,
_p1: &Self::Process,
_p1_port: &Self::Port,
_p2: &Self::ExternalProcess,
_p2_port: &Self::Port,
) -> syn::Expr {
panic!()
}

fn o2e_connect(
_p1: &Self::Process,
_p1_port: &Self::Port,
_p2: &Self::ExternalProcess,
_p2_port: &Self::Port,
) {
panic!()
}

fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
Expand Down Expand Up @@ -196,6 +215,31 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
> + 'a {
async { panic!() }
}

#[expect(
clippy::manual_async_fn,
reason = "buggy Clippy lint for lifetime bounds"
)]
fn as_bytes_source(
&self,
_key: usize,
) -> impl std::future::Future<
Output = Pin<Box<dyn hydroflow::futures::Stream<Item = hydroflow::bytes::Bytes>>>,
> + 'a {
async { panic!() }
}

#[expect(
clippy::manual_async_fn,
reason = "buggy Clippy lint for lifetime bounds"
)]
fn as_bincode_source<T: serde::de::DeserializeOwned + 'static>(
&self,
_key: usize,
) -> impl std::future::Future<Output = Pin<Box<dyn hydroflow::futures::Stream<Item = T>>>> + 'a
{
async { panic!() }
}
}

impl Node for DeployRuntimeNode {
Expand Down
30 changes: 29 additions & 1 deletion hydroflow_plus/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::io::Error;
use std::pin::Pin;

use hydroflow::bytes::Bytes;
use hydroflow::futures::Sink;
use hydroflow::futures::{Sink, Stream};
use hydroflow_lang::graph::HydroflowGraph;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;

Expand Down Expand Up @@ -151,6 +152,21 @@ pub trait Deploy<'a> {
p2_port: &Self::Port,
);

fn o2e_sink(
compile_env: &Self::CompileEnv,
p1: &Self::Process,
p1_port: &Self::Port,
p2: &Self::ExternalProcess,
p2_port: &Self::Port,
) -> syn::Expr;

fn o2e_connect(
p1: &Self::Process,
p1_port: &Self::Port,
p2: &Self::ExternalProcess,
p2_port: &Self::Port,
);

fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
Expand Down Expand Up @@ -220,12 +236,24 @@ pub trait Node {
pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone {
fn register(&self, key: usize, port: D::Port);
fn raw_port(&self, key: usize) -> D::ExternalRawPort;

fn as_bytes_sink(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;

fn as_bincode_sink<T: Serialize + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a;

fn as_bytes_source(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;

fn as_bincode_source<T: DeserializeOwned + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a;
}
Loading

0 comments on commit 26d8d12

Please sign in to comment.