Skip to content

Commit

Permalink
feat(hydroflow_plus)!: mark non-deterministic operators as unsafe and…
Browse files Browse the repository at this point in the history
… introduce timestamped streams

Big PR.

First big change is we introduce a `Timestamped` location. This is a bit of a hybrid between top-level locations and `Tick` locations. The idea is that you choose where timestamps are generated, and then have a guarantee that everything after that will be atomically computed (useful for making sure we add payloads to the log before ack-ing).

The contract is that an operator or module that takes a `Timestamped` input must still be deterministic regardless of the stamps on messages (which are hidden unless you `tick_batch`). But unlike a top-level stream (which has the same constraints), you have the atomicity guarantee. Right now the guarantee is trivial since we have one global tick for everything. But in the future when we want to apply @davidchuyaya's optimizations this will be helpful to know when there are causal dependencies on when data can be sent to others.

Second change is we mark every non-deterministic operator (modulo explicit annotations such as `NoOrder`) with Rust's `unsafe` keyword. This makes it super clear where non-determinism is taking place.

I've used this to put `unsafe` blocks throughout our example code and add `SAFETY` annotations that argue why the non-determinism is safe (or point out that we've explicitly documented / expect non-determinism). I also added `#![warn(unsafe_op_in_unsafe_fn)]` to the examples and the template, since this forces good hygiene of annotating sources of non-determinism even inside a module that is intentionally non-deterministic.

Paxos changes are mostly refactors, and I verified that the performance is the same as before.
  • Loading branch information
shadaj committed Nov 23, 2024
1 parent ec55910 commit 336101e
Show file tree
Hide file tree
Showing 29 changed files with 1,442 additions and 620 deletions.
10 changes: 6 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::location::Location;
use crate::location::{Location, LocationId};
use crate::staging_util::Invariant;

pub enum ForwardRefMarker {}
Expand All @@ -9,7 +9,7 @@ pub trait DeferTick {
}

pub trait CycleComplete<'a, T> {
fn complete(self, ident: syn::Ident);
fn complete(self, ident: syn::Ident, expected_location: LocationId);
}

pub trait CycleCollection<'a, T>: CycleComplete<'a, T> {
Expand All @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
S::complete(stream, ident, self.expected_location)
}
}

pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream.defer_tick(), ident)
S::complete(stream.defer_tick(), ident, self.expected_location)
}
}
1 change: 1 addition & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(box_patterns)]
#![warn(unsafe_op_in_unsafe_fn)]

stageleft::stageleft_no_entry_crate!();

Expand Down
12 changes: 7 additions & 5 deletions hydroflow_plus/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> {

pub trait IsCluster {
type Tag;
fn id(&self) -> usize;
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
fn id(&self) -> usize {
self.id
}
}

impl<'a, C> Cluster<'a, C> {
Expand Down Expand Up @@ -125,8 +121,14 @@ where
where
Self: Sized,
{
let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
id
} else {
unreachable!()
};

let ident = syn::Ident::new(
&format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()),
&format!("__hydroflow_plus_cluster_self_id_{}", cluster_id),
Span::call_site(),
);
let root = get_this_crate();
Expand Down
26 changes: 23 additions & 3 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
}

pub trait Location<'a>: Clone {
type Root;
type Root: Location<'a>;

fn root(&self) -> Self::Root;

Expand Down Expand Up @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone {
)
}

fn source_interval(
/// Generates a stream with values emitted at a fixed interval, with
/// each value being the current time (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
Expand All @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone {
)))
}

fn source_interval_delayed(
/// Generates a stream with values emitted at a fixed interval (with an
/// initial delay), with each value being the current time
/// (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
Expand Down Expand Up @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down
90 changes: 83 additions & 7 deletions hydroflow_plus/src/location/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,38 @@ pub trait NoTick {}
impl<T> NoTick for Process<'_, T> {}
impl<T> NoTick for Cluster<'_, T> {}

pub trait NoTimestamp {}
impl<T> NoTimestamp for Process<'_, T> {}
impl<T> NoTimestamp for Cluster<'_, T> {}
impl<'a, L: Location<'a>> NoTimestamp for Tick<L> {}

#[derive(Clone)]
pub struct Timestamped<L> {
pub(crate) tick: Tick<L>,
}

impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
type Root = L::Root;

fn root(&self) -> Self::Root {
self.tick.root()
}

fn id(&self) -> LocationId {
self.tick.id()
}

fn flow_state(&self) -> &FlowState {
self.tick.flow_state()
}

fn is_top_level() -> bool {
L::is_top_level()
}
}

impl<L> NoTick for Timestamped<L> {}

/// Marks the stream as being inside the single global clock domain.
#[derive(Clone)]
pub struct Tick<L> {
Expand Down Expand Up @@ -53,13 +85,18 @@ impl<'a, L: Location<'a>> Tick<L> {
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded>
where
L: NoTick,
L: NoTick + NoTimestamp,
{
self.l
.spin()
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
self.l
.spin()
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.timestamped(self)
.tick_batch()
}
}

pub fn singleton<T: Clone>(
Expand All @@ -69,7 +106,10 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.outer().singleton(e).latest_tick(self)
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).timestamped(self).latest_tick()
}
}

pub fn singleton_first_tick<T: Clone>(
Expand Down Expand Up @@ -118,12 +158,46 @@ impl<'a, L: Location<'a>> Tick<L> {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
)
}

pub fn forward_ref_timestamped<
S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped<L>>,
>(
&self,
) -> (ForwardRef<'a, S>, S) {
let next_id = {
let on_id = match self.l.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, Timestamped { tick: self.clone() }),
)
}

pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
&self,
) -> (TickCycle<'a, S>, S)
Expand Down Expand Up @@ -151,6 +225,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down Expand Up @@ -187,6 +262,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, initial, self.clone()),
Expand Down
Loading

0 comments on commit 336101e

Please sign in to comment.