diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index 4fcdd1cb342c..1c165ddd00f0 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; use std::pin::Pin; use hydroflow::bytes::Bytes; -use hydroflow::futures::Sink; +use hydroflow::futures::{Sink, Stream}; use proc_macro2::Span; use serde::de::DeserializeOwned; use serde::Serialize; @@ -14,7 +14,8 @@ use super::built::build_inner; use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort}; use crate::ir::HfPlusLeaf; use crate::location::{ - ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId, + ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, + LocationId, }; use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec}; @@ -261,7 +262,7 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> { pub async fn connect_sink_bincode( &self, - port: ExternalBincodePort, + port: ExternalBincodeSink, ) -> Pin>> { self.externals .get(&port.process_id) @@ -269,4 +270,26 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> { .as_bincode_sink(port.port_id) .await } + + pub async fn connect_source_bytes( + &self, + port: ExternalBytesPort, + ) -> Pin>> { + self.externals + .get(&port.process_id) + .unwrap() + .as_bytes_source(port.port_id) + .await + } + + pub async fn connect_source_bincode( + &self, + port: ExternalBincodeStream, + ) -> Pin>> { + self.externals + .get(&port.process_id) + .unwrap() + .as_bincode_source(port.port_id) + .await + } } diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 38e8907d3c38..f339ea6642bf 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -18,7 +18,7 @@ use stageleft::*; use crate::cycle::{CycleCollection, CycleCollectionWithInitial}; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{ - Cluster, ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId, Process, + Cluster, ExternalBincodeSink, ExternalBytesPort, ExternalProcess, Location, LocationId, Process, }; use crate::stream::{Bounded, NoTick, Tick, Unbounded}; use crate::{HfCycle, Optional, RuntimeContext, Singleton, Stream}; @@ -29,7 +29,9 @@ pub mod deploy; /// Tracks the leaves of the dataflow IR. This is referenced by /// `Stream` and `HfCycle` to build the IR. The inner option will /// be set to `None` when this builder is finalized. -pub type FlowLeaves<'a> = Rc>>>>; +pub type FlowLeaves<'a> = Rc>>, usize)>>; + +pub type ExternalPortCounter = Rc>; #[derive(Copy, Clone)] pub struct ClusterIds<'a> { @@ -80,7 +82,6 @@ pub struct FlowBuilder<'a> { cycle_ids: RefCell>, next_node_id: RefCell, - next_external_port_id: RefCell, /// Tracks whether this flow has been finalized; it is an error to /// drop without finalizing. @@ -114,12 +115,11 @@ impl<'a> FlowBuilder<'a> { )] pub fn new() -> FlowBuilder<'a> { FlowBuilder { - ir_leaves: Rc::new(RefCell::new(Some(Vec::new()))), + ir_leaves: Rc::new(RefCell::new((Some(Vec::new()), 0))), nodes: RefCell::new(vec![]), clusters: RefCell::new(vec![]), cycle_ids: RefCell::new(HashMap::new()), next_node_id: RefCell::new(0), - next_external_port_id: RefCell::new(0), finalized: false, _phantom: PhantomData, } @@ -129,7 +129,7 @@ impl<'a> FlowBuilder<'a> { self.finalized = true; built::BuiltFlow { - ir: self.ir_leaves.borrow_mut().take().unwrap(), + ir: self.ir_leaves.borrow_mut().0.take().unwrap(), processes: self.nodes.replace(vec![]), clusters: self.clusters.replace(vec![]), used: false, @@ -242,9 +242,9 @@ impl<'a> FlowBuilder<'a> { to: &L, ) -> (ExternalBytesPort, Stream<'a, Bytes, Unbounded, NoTick, L>) { let next_external_port_id = { - let mut next_external_port_id = self.next_external_port_id.borrow_mut(); - let id = *next_external_port_id; - *next_external_port_id += 1; + let mut ir_leaves = self.ir_leaves.borrow_mut(); + let id = ir_leaves.1; + ir_leaves.1 += 1; id }; @@ -277,16 +277,16 @@ impl<'a> FlowBuilder<'a> { &self, from: &ExternalProcess

, to: &L, - ) -> (ExternalBincodePort, Stream<'a, T, Unbounded, NoTick, L>) { + ) -> (ExternalBincodeSink, Stream<'a, T, Unbounded, NoTick, L>) { let next_external_port_id = { - let mut next_external_port_id = self.next_external_port_id.borrow_mut(); - let id = *next_external_port_id; - *next_external_port_id += 1; + let mut ir_leaves = self.ir_leaves.borrow_mut(); + let id = ir_leaves.1; + ir_leaves.1 += 1; id }; ( - ExternalBincodePort { + ExternalBincodeSink { process_id: from.id, port_id: next_external_port_id, _phantom: PhantomData, diff --git a/hydroflow_plus/src/deploy/deploy_graph.rs b/hydroflow_plus/src/deploy/deploy_graph.rs index e6b41f6e62f1..3582d11cbf5e 100644 --- a/hydroflow_plus/src/deploy/deploy_graph.rs +++ b/hydroflow_plus/src/deploy/deploy_graph.rs @@ -13,7 +13,10 @@ use hydro_deploy::hydroflow_crate::ports::{ use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions; use hydro_deploy::hydroflow_crate::HydroflowCrateService; use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate}; +use hydroflow::futures::StreamExt; +use hydroflow::util::deploy::ConnectedSource; use nameof::name_of; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use stageleft::{Quoted, RuntimeData}; @@ -290,6 +293,49 @@ impl<'a> Deploy<'a> for HydroDeploy { .insert(p1_port.clone(), source_port); } + fn o2e_sink( + _compile_env: &Self::CompileEnv, + _p1: &Self::Process, + p1_port: &Self::Port, + _p2: &Self::ExternalProcess, + p2_port: &Self::Port, + ) -> syn::Expr { + let p1_port = p1_port.as_str(); + let p2_port = p2_port.as_str(); + deploy_o2e( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + p1_port, + p2_port, + ) + } + + fn o2e_connect( + p1: &Self::Process, + p1_port: &Self::Port, + p2: &Self::ExternalProcess, + p2_port: &Self::Port, + ) { + let self_underlying_borrow = p1.underlying.borrow(); + let self_underlying = self_underlying_borrow.as_ref().unwrap(); + let source_port = self_underlying + .try_read() + .unwrap() + .get_port(p1_port.clone(), self_underlying); + + let other_underlying_borrow = p2.underlying.borrow(); + let other_underlying = other_underlying_borrow.as_ref().unwrap(); + let recipient_port = other_underlying + .try_read() + .unwrap() + .declare_client(other_underlying); + + source_port.send_to(&recipient_port); + + p2.client_ports + .borrow_mut() + .insert(p2_port.clone(), recipient_port); + } + fn cluster_ids( _env: &Self::CompileEnv, of_cluster: usize, @@ -426,7 +472,7 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal { let port = self.raw_port(key); async move { let sink = port.connect().await.into_sink(); - Box::pin(sink) as Pin>> + sink as Pin>> } } @@ -441,6 +487,31 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal { as Pin>> } } + + fn as_bytes_source( + &self, + key: usize, + ) -> impl Future>>> + 'a + { + let port = self.raw_port(key); + async move { + let source = port.connect().await.into_source(); + Box::pin(source.map(|r| r.unwrap().freeze())) + as Pin>> + } + } + + fn as_bincode_source( + &self, + key: usize, + ) -> impl Future>>> + 'a { + let port = self.raw_port(key); + async move { + let source = port.connect().await.into_source(); + Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap())) + as Pin>> + } + } } impl Node for DeployExternal { diff --git a/hydroflow_plus/src/deploy/deploy_runtime.rs b/hydroflow_plus/src/deploy/deploy_runtime.rs index eb3cdf40c171..a8857a1d9ae1 100644 --- a/hydroflow_plus/src/deploy/deploy_runtime.rs +++ b/hydroflow_plus/src/deploy/deploy_runtime.rs @@ -123,7 +123,7 @@ pub fn deploy_m2m( pub fn deploy_e2o( env: RuntimeData<&DeployPorts>, - c1_port: &str, + e1_port: &str, p2_port: &str, ) -> syn::Expr { q!({ @@ -133,3 +133,16 @@ pub fn deploy_e2o( }) .splice_untyped() } + +pub fn deploy_o2e( + env: RuntimeData<&DeployPorts>, + p1_port: &str, + e2_port: &str, +) -> syn::Expr { + q!({ + env.port(p1_port) + .connect_local_blocking::() + .into_sink() + }) + .splice_untyped() +} diff --git a/hydroflow_plus/src/deploy/macro_runtime.rs b/hydroflow_plus/src/deploy/macro_runtime.rs index a30f25d9699a..28c8c0b057fe 100644 --- a/hydroflow_plus/src/deploy/macro_runtime.rs +++ b/hydroflow_plus/src/deploy/macro_runtime.rs @@ -145,6 +145,25 @@ impl<'a> Deploy<'a> for DeployRuntime { panic!() } + fn o2e_sink( + _compile_env: &Self::CompileEnv, + _p1: &Self::Process, + _p1_port: &Self::Port, + _p2: &Self::ExternalProcess, + _p2_port: &Self::Port, + ) -> syn::Expr { + panic!() + } + + fn o2e_connect( + _p1: &Self::Process, + _p1_port: &Self::Port, + _p2: &Self::ExternalProcess, + _p2_port: &Self::Port, + ) { + panic!() + } + fn cluster_ids( env: &Self::CompileEnv, of_cluster: usize, @@ -196,6 +215,31 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { > + 'a { async { panic!() } } + + #[expect( + clippy::manual_async_fn, + reason = "buggy Clippy lint for lifetime bounds" + )] + fn as_bytes_source( + &self, + _key: usize, + ) -> impl std::future::Future< + Output = Pin>>, + > + 'a { + async { panic!() } + } + + #[expect( + clippy::manual_async_fn, + reason = "buggy Clippy lint for lifetime bounds" + )] + fn as_bincode_source( + &self, + _key: usize, + ) -> impl std::future::Future>>> + 'a + { + async { panic!() } + } } impl Node for DeployRuntimeNode { diff --git a/hydroflow_plus/src/deploy/mod.rs b/hydroflow_plus/src/deploy/mod.rs index 2e82b9c6fc93..696482e14748 100644 --- a/hydroflow_plus/src/deploy/mod.rs +++ b/hydroflow_plus/src/deploy/mod.rs @@ -3,8 +3,9 @@ use std::io::Error; use std::pin::Pin; use hydroflow::bytes::Bytes; -use hydroflow::futures::Sink; +use hydroflow::futures::{Sink, Stream}; use hydroflow_lang::graph::HydroflowGraph; +use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::Quoted; @@ -151,6 +152,21 @@ pub trait Deploy<'a> { p2_port: &Self::Port, ); + fn o2e_sink( + compile_env: &Self::CompileEnv, + p1: &Self::Process, + p1_port: &Self::Port, + p2: &Self::ExternalProcess, + p2_port: &Self::Port, + ) -> syn::Expr; + + fn o2e_connect( + p1: &Self::Process, + p1_port: &Self::Port, + p2: &Self::ExternalProcess, + p2_port: &Self::Port, + ); + fn cluster_ids( env: &Self::CompileEnv, of_cluster: usize, @@ -220,12 +236,24 @@ pub trait Node { pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone { fn register(&self, key: usize, port: D::Port); fn raw_port(&self, key: usize) -> D::ExternalRawPort; + fn as_bytes_sink( &self, key: usize, ) -> impl Future>>> + 'a; + fn as_bincode_sink( &self, key: usize, ) -> impl Future>>> + 'a; + + fn as_bytes_source( + &self, + key: usize, + ) -> impl Future>>> + 'a; + + fn as_bincode_source( + &self, + key: usize, + ) -> impl Future>>> + 'a; } diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 4d2c4f3217f1..c765bc93e9f4 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -1112,7 +1112,7 @@ impl<'a> HfPlusNode<'a> { let to_id = match to_location { LocationId::Process(id) => id, LocationId::Cluster(id) => id, - LocationId::ExternalProcess(_) => panic!(), + LocationId::ExternalProcess(id) => id, }; let receiver_builder = graph_builders.entry(*to_id).or_default(); @@ -1143,7 +1143,7 @@ fn instantiate_network<'a, D: Deploy<'a> + 'a>( from_location: &mut LocationId, from_key: Option, to_location: &mut LocationId, - _to_key: Option, + to_key: Option, nodes: &HashMap, clusters: &HashMap, externals: &HashMap, @@ -1280,8 +1280,34 @@ fn instantiate_network<'a, D: Deploy<'a> + 'a>( (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => { panic!("Cannot send from external to external") } - (LocationId::Process(_from), LocationId::ExternalProcess(_to)) => { - todo!("NYI") + (LocationId::Process(from), LocationId::ExternalProcess(to)) => { + let from_node = nodes + .get(from) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", from) + }) + .clone(); + + let to_node = externals + .get(to) + .unwrap_or_else(|| { + panic!("A external used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_process_port(&from_node); + let source_port = D::allocate_external_port(&to_node); + + to_node.register(to_key.unwrap(), source_port.clone()); + + ( + ( + D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port), + parse_quote!(DUMMY), + ), + Box::new(move || D::o2e_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) } (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => { todo!("NYI") diff --git a/hydroflow_plus/src/location.rs b/hydroflow_plus/src/location.rs index 0b361bd6d7e3..fdca34f62f1e 100644 --- a/hydroflow_plus/src/location.rs +++ b/hydroflow_plus/src/location.rs @@ -19,7 +19,13 @@ pub struct ExternalBytesPort { pub(crate) port_id: usize, } -pub struct ExternalBincodePort { +pub struct ExternalBincodeSink { + pub(crate) process_id: usize, + pub(crate) port_id: usize, + pub(crate) _phantom: PhantomData, +} + +pub struct ExternalBincodeStream { pub(crate) process_id: usize, pub(crate) port_id: usize, pub(crate) _phantom: PhantomData, @@ -39,6 +45,12 @@ impl

Clone for ExternalProcess

{ } } +impl

Location for ExternalProcess

{ + fn id(&self) -> LocationId { + LocationId::ExternalProcess(self.id) + } +} + pub struct Process

{ pub(crate) id: usize, pub(crate) _phantom: PhantomData

, @@ -138,3 +150,16 @@ impl CanSend> for Cluster { true } } + +impl CanSend> for Process { + type In = T; + type Out = T; + + fn is_demux() -> bool { + false + } + + fn is_tagged() -> bool { + false + } +} diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index 822f5c1ad287..b6a9fd60111b 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -150,7 +150,7 @@ impl<'a, T, C, N: Location> From> impl<'a, T, N: Location> CycleComplete<'a, Tick> for Singleton<'a, T, Bounded, Tick, N> { fn complete(self, ident: syn::Ident) { - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + self.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_kind: self.location_kind, input: Box::new(self.ir_node.into_inner()), @@ -408,7 +408,7 @@ impl<'a, T, W, C, N: Location> Optional<'a, T, W, C, N> { impl<'a, T, N: Location> CycleComplete<'a, Tick> for Optional<'a, T, Bounded, Tick, N> { fn complete(self, ident: syn::Ident) { let me = self.defer_tick(); - me.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + me.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_kind: me.location_kind, input: Box::new(me.ir_node.into_inner()), @@ -433,7 +433,7 @@ impl<'a, T, N: Location> CycleCollection<'a, Tick> for Optional<'a, T, Bounded, impl<'a, T, W, N: Location> CycleComplete<'a, NoTick> for Optional<'a, T, W, NoTick, N> { fn complete(self, ident: syn::Ident) { - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + self.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_kind: self.location_kind, input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 2a33ebc0fccf..f8fcdc1ba83e 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -17,7 +17,9 @@ use syn::parse_quote; use crate::builder::{ClusterIds, FlowLeaves}; use crate::cycle::{CycleCollection, CycleComplete}; use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource}; -use crate::location::{CanSend, Location, LocationId}; +use crate::location::{ + CanSend, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, LocationId, +}; use crate::{Cluster, Optional, Singleton}; /// Marks the stream as being unbounded, which means that it is not @@ -56,7 +58,7 @@ pub struct Stream<'a, T, W, C, N: Location> { impl<'a, T, N: Location> CycleComplete<'a, Tick> for Stream<'a, T, Bounded, Tick, N> { fn complete(self, ident: syn::Ident) { let me = self.defer_tick(); - me.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + me.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_kind: me.location_kind, input: Box::new(me.ir_node.into_inner()), @@ -81,7 +83,7 @@ impl<'a, T, N: Location> CycleCollection<'a, Tick> for Stream<'a, T, Bounded, Ti impl<'a, T, W, N: Location> CycleComplete<'a, NoTick> for Stream<'a, T, W, NoTick, N> { fn complete(self, ident: syn::Ident) { - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + self.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_kind: self.location_kind, input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), @@ -270,7 +272,7 @@ impl<'a, T, W, C, N: Location> Stream<'a, T, W, C, N> { } pub fn dest_sink + 'a>(self, sink: impl Quoted<'a, S>) { - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::DestSink { + self.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::DestSink { sink: sink.splice_typed().into(), input: Box::new(self.ir_node.into_inner()), }); @@ -461,7 +463,7 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { pub fn for_each(self, f: impl IntoQuotedMut<'a, F>) { - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::ForEach { + self.ir_leaves.borrow_mut().0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::ForEach { input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), f: f.splice_fn1().into(), }); @@ -725,6 +727,47 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { ) } + pub fn send_bincode_external( + self, + other: &ExternalProcess, + ) -> ExternalBincodeStream> + where + N: CanSend, In = T, Out = CoreType>, + CoreType: Serialize + DeserializeOwned, + // for now, we restirct Out to be CoreType, which means no tagged cluster -> external + { + let serialize_pipeline = Some(serialize_bincode::(N::is_demux())); + + let mut leaves_borrow = self.ir_leaves.borrow_mut(); + + let external_key = leaves_borrow.1; + leaves_borrow.1 += 1; + + let leaves = leaves_borrow.0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()"); + + let dummy_f: syn::Expr = syn::parse_quote!(()); + + leaves.push(HfPlusLeaf::ForEach { + f: dummy_f.into(), + input: Box::new(HfPlusNode::Network { + from_location: self.location_kind, + from_key: None, + to_location: other.id(), + to_key: Some(external_key), + serialize_pipeline, + instantiate_fn: DebugInstantiate::Building(), + deserialize_pipeline: None, + input: Box::new(self.ir_node.into_inner()), + }), + }); + + ExternalBincodeStream { + process_id: other.id, + port_id: external_key, + _phantom: PhantomData, + } + } + pub fn send_bytes( self, other: &N2, @@ -752,6 +795,38 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { ) } + pub fn send_bytes_external(self, other: &ExternalProcess) -> ExternalBytesPort + where + N: CanSend, In = T, Out = Bytes>, + { + let mut leaves_borrow = self.ir_leaves.borrow_mut(); + let external_key = leaves_borrow.1; + leaves_borrow.1 += 1; + + let leaves = leaves_borrow.0.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()"); + + let dummy_f: syn::Expr = syn::parse_quote!(()); + + leaves.push(HfPlusLeaf::ForEach { + f: dummy_f.into(), + input: Box::new(HfPlusNode::Network { + from_location: self.location_kind, + from_key: None, + to_location: other.id(), + to_key: Some(external_key), + serialize_pipeline: None, + instantiate_fn: DebugInstantiate::Building(), + deserialize_pipeline: None, + input: Box::new(self.ir_node.into_inner()), + }), + }); + + ExternalBytesPort { + process_id: other.id, + port_id: external_key, + } + } + pub fn send_bincode_interleaved( self, other: &N2, @@ -838,11 +913,14 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { #[cfg(test)] mod tests { - use hydro_deploy::Deployment; + use std::sync::Arc; + + use hydro_deploy::{Deployment, Host}; + use hydroflow::futures::StreamExt; use serde::{Deserialize, Serialize}; use stageleft::q; - use crate::deploy::{DeployCrateWrapper, TrybuildHost}; + use crate::deploy::TrybuildHost; use crate::FlowBuilder; struct P1 {} @@ -860,27 +938,29 @@ mod tests { let flow = FlowBuilder::new(); let first_node = flow.process::(); let second_node = flow.process::(); + let external = flow.external_process::(); let numbers = flow.source_iter(&first_node, q!(0..10)); - numbers + let out_port = numbers .map(q!(|n| SendOverNetwork { n })) .send_bincode(&second_node) - .for_each(q!(|n| println!("{}", n.n))); + .send_bincode_external(&external); let nodes = flow .with_default_optimize() .with_process(&first_node, TrybuildHost::new(deployment.Localhost())) .with_process(&second_node, TrybuildHost::new(deployment.Localhost())) + .with_external(&external, deployment.Localhost() as Arc) .deploy(&mut deployment); deployment.deploy().await.unwrap(); - let mut second_node_stdout = nodes.get_process(&second_node).stdout().await; + let mut external_out = nodes.connect_source_bincode(out_port).await; deployment.start().await.unwrap(); for i in 0..10 { - assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string()); + assert_eq!(external_out.next().await.unwrap().n, i); } } } diff --git a/hydroflow_plus_test/src/distributed/first_ten.rs b/hydroflow_plus_test/src/distributed/first_ten.rs index 9372b174d717..d08264977740 100644 --- a/hydroflow_plus_test/src/distributed/first_ten.rs +++ b/hydroflow_plus_test/src/distributed/first_ten.rs @@ -1,5 +1,5 @@ use hydroflow_plus::*; -use location::{ExternalBincodePort, ExternalProcess}; +use location::{ExternalBincodeSink, ExternalProcess}; use serde::{Deserialize, Serialize}; use stageleft::*; @@ -15,7 +15,7 @@ pub fn first_ten_distributed( flow: &FlowBuilder, ) -> ( ExternalProcess<()>, - ExternalBincodePort, + ExternalBincodeSink, Process, Process, ) {