Skip to content

Commit

Permalink
feat(hydroflow_plus)!: strongly-typed runtime cluster IDs
Browse files Browse the repository at this point in the history
Instead of `u32`s everywhere, we now have a `ClusterId<C>` type that ensures that cluster IDs are not misused.
  • Loading branch information
shadaj committed Oct 4, 2024
1 parent a064d60 commit 008ca9b
Show file tree
Hide file tree
Showing 21 changed files with 699 additions and 518 deletions.
6 changes: 3 additions & 3 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ stream.for_each(q!(|x| println!("{}", x)))
## Sending Data
Because clusters represent a set of instances, adding networking requires us to specify _which_ instance(s) to send data to. Clusters provide different types depending on if the source or receiver is a cluster or a process.

Elements in a cluster are identified by a **cluster ID** (a `u32`). To get the IDs of all instances in a cluster, use the `ids` method on cluster, which returns a runtime expression of type `&Vec<u32>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.
Elements in a cluster are identified by a **cluster ID** (a `ClusterId<C>` where `C` is the typetag of the cluster). To get the IDs of all instances in a cluster, use the `members` method on cluster, which returns a runtime expression of type `&Vec<ClusterId<_>>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.

This can then be passed into `source_iter` to load the IDs into the graph.
```rust
let stream = process.source_iter(cluster.members()).cloned();
```

### One-to-Many
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(u32, T)` and sends each `T` element to the instance with the matching `u32` ID.
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(ClusterId<_>, T)` and sends each `T` element to the instance with the matching ID.

This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number:
```rust
let cluster_ids = cluster.members();
let stream = process.source_iter(q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
ClusterId::from_raw(i % cluster_ids.len() as u32),
x
)))
.send_bincode(&cluster);
Expand Down
53 changes: 40 additions & 13 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use quote::quote;
use runtime_support::FreeVariable;
use stageleft::*;

use super::staging_util::get_this_crate;
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::RuntimeContext;
use crate::{ClusterId, RuntimeContext};

pub mod built;
pub mod deploy;
Expand All @@ -31,13 +32,20 @@ pub struct FlowStateInner {

pub type FlowState = Rc<RefCell<FlowStateInner>>;

#[derive(Copy, Clone)]
pub struct ClusterIds<'a> {
pub struct ClusterIds<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a Vec<u32>>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
impl<'a, C> Clone for ClusterIds<'a, C> {
fn clone(&self) -> Self {
*self
}
}

impl<'a, C> Copy for ClusterIds<'a, C> {}

impl<'a, C> FreeVariable<&'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -46,19 +54,33 @@ impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
&format!("__hydroflow_plus_cluster_ids_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type = quote_type::<C>();
(
None,
Some(
quote! { unsafe { ::std::mem::transmute::<_, &::std::vec::Vec<#root::ClusterId<#c_type>>>(#ident) } },
),
)
}
}

impl<'a> Quoted<'a, &'a Vec<u32>> for ClusterIds<'a> {}
impl<'a, C> Quoted<'a, &'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {}

#[derive(Copy, Clone)]
pub(crate) struct ClusterSelfId<'a> {
pub(crate) struct ClusterSelfId<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a u32>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
impl<'a, C> Clone for ClusterSelfId<'a, C> {
fn clone(&self) -> Self {
*self
}
}

impl<'a, C> Copy for ClusterSelfId<'a, C> {}

impl<'a, C> FreeVariable<ClusterId<C>> for ClusterSelfId<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -67,11 +89,16 @@ impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
&format!("__hydroflow_plus_cluster_self_id_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type: syn::Type = quote_type::<C>();
(
None,
Some(quote! { #root::ClusterId::<#c_type>::from_raw(#ident) }),
)
}
}

impl<'a> Quoted<'a, u32> for ClusterSelfId<'a> {}
impl<'a, C> Quoted<'a, ClusterId<C>> for ClusterSelfId<'a, C> {}

pub struct FlowBuilder<'a> {
flow_state: FlowState,
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow::futures::StreamExt;
use hydroflow::util::deploy::ConnectedSource;
use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -29,7 +29,6 @@ use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::ConnectedSink;

pub struct HydroDeploy {}

Expand Down
7 changes: 3 additions & 4 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

use crate::util::deploy::{
use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;

use hydroflow::util::deploy::DeployPorts;
use stageleft::{Quoted, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::DeployPorts;

pub struct DeployRuntime {}

Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod singleton;
pub use singleton::{Optional, Singleton};

pub mod location;
pub use location::{Cluster, Location, Process};
pub use location::{Cluster, ClusterId, Location, Process};

pub mod deploy;
pub use deploy::{ClusterSpec, Deploy, ProcessSpec};
Expand All @@ -43,6 +43,8 @@ pub mod profiler;

pub mod properties;

mod staging_util;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
Expand Down
Loading

0 comments on commit 008ca9b

Please sign in to comment.