Skip to content

Commit

Permalink
feat(hydroflow_plus)!: mark non-deterministic operators as unsafe and…
Browse files Browse the repository at this point in the history
… introduce timestamped streams
  • Loading branch information
shadaj committed Nov 22, 2024
1 parent e20aaee commit edb2843
Show file tree
Hide file tree
Showing 23 changed files with 1,145 additions and 403 deletions.
10 changes: 6 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::location::Location;
use crate::location::{Location, LocationId};
use crate::staging_util::Invariant;

pub enum ForwardRefMarker {}
Expand All @@ -9,7 +9,7 @@ pub trait DeferTick {
}

pub trait CycleComplete<'a, T> {
fn complete(self, ident: syn::Ident);
fn complete(self, ident: syn::Ident, expected_location: LocationId);
}

pub trait CycleCollection<'a, T>: CycleComplete<'a, T> {
Expand All @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
S::complete(stream, ident, self.expected_location)
}
}

pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream.defer_tick(), ident)
S::complete(stream.defer_tick(), ident, self.expected_location)
}
}
12 changes: 7 additions & 5 deletions hydroflow_plus/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> {

pub trait IsCluster {
type Tag;
fn id(&self) -> usize;
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
fn id(&self) -> usize {
self.id
}
}

impl<'a, C> Cluster<'a, C> {
Expand Down Expand Up @@ -125,8 +121,14 @@ where
where
Self: Sized,
{
let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
id
} else {
unreachable!()
};

let ident = syn::Ident::new(
&format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()),
&format!("__hydroflow_plus_cluster_self_id_{}", cluster_id),
Span::call_site(),
);
let root = get_this_crate();
Expand Down
26 changes: 23 additions & 3 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
}

pub trait Location<'a>: Clone {
type Root;
type Root: Location<'a>;

fn root(&self) -> Self::Root;

Expand Down Expand Up @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone {
)
}

fn source_interval(
/// Generates a stream with values emitted at a fixed interval, with
/// each value being the current time (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
Expand All @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone {
)))
}

fn source_interval_delayed(
/// Generates a stream with values emitted at a fixed interval (with an
/// initial delay), with each value being the current time
/// (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
Expand Down Expand Up @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down
90 changes: 83 additions & 7 deletions hydroflow_plus/src/location/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,38 @@ pub trait NoTick {}
impl<T> NoTick for Process<'_, T> {}
impl<T> NoTick for Cluster<'_, T> {}

pub trait NoTimestamp {}
impl<T> NoTimestamp for Process<'_, T> {}
impl<T> NoTimestamp for Cluster<'_, T> {}
impl<'a, L: Location<'a>> NoTimestamp for Tick<L> {}

#[derive(Clone)]
pub struct Timestamped<L> {
pub(crate) tick: Tick<L>,
}

impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
type Root = L::Root;

fn root(&self) -> Self::Root {
self.tick.root()
}

fn id(&self) -> LocationId {
self.tick.id()
}

fn flow_state(&self) -> &FlowState {
self.tick.flow_state()
}

fn is_top_level() -> bool {
L::is_top_level()
}
}

impl<L> NoTick for Timestamped<L> {}

/// Marks the stream as being inside the single global clock domain.
#[derive(Clone)]
pub struct Tick<L> {
Expand Down Expand Up @@ -53,13 +85,18 @@ impl<'a, L: Location<'a>> Tick<L> {
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded>
where
L: NoTick,
L: NoTick + NoTimestamp,
{
self.l
.spin()
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
self.l
.spin()
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.timestamped(self)
.tick_batch()
}
}

pub fn singleton<T: Clone>(
Expand All @@ -69,7 +106,10 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.outer().singleton(e).latest_tick(self)
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).timestamped(self).latest_tick()
}
}

pub fn singleton_first_tick<T: Clone>(
Expand Down Expand Up @@ -118,12 +158,46 @@ impl<'a, L: Location<'a>> Tick<L> {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
)
}

pub fn forward_ref_timestamped<
S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped<L>>,
>(
&self,
) -> (ForwardRef<'a, S>, S) {
let next_id = {
let on_id = match self.l.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, Timestamped { tick: self.clone() }),
)
}

pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
&self,
) -> (TickCycle<'a, S>, S)
Expand Down Expand Up @@ -151,6 +225,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down Expand Up @@ -187,6 +262,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, initial, self.clone()),
Expand Down
Loading

0 comments on commit edb2843

Please sign in to comment.