Skip to content

Commit

Permalink
feat(hydroflow_plus)!: mark non-deterministic operators as unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Nov 22, 2024
1 parent e20aaee commit 92a7be4
Show file tree
Hide file tree
Showing 20 changed files with 530 additions and 241 deletions.
23 changes: 21 additions & 2 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone {
)
}

fn source_interval(
/// Generates a stream with values emitted at a fixed interval, with
/// each value being the current time (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
Expand All @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone {
)))
}

fn source_interval_delayed(
/// Generates a stream with values emitted at a fixed interval (with an
/// initial delay), with each value being the current time
/// (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
Expand Down
19 changes: 13 additions & 6 deletions hydroflow_plus/src/location/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.l
.spin()
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
self.l
.spin()
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
}
}

pub fn singleton<T: Clone>(
Expand All @@ -69,7 +73,10 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.outer().singleton(e).latest_tick(self)
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).latest_tick(self)
}
}

pub fn singleton_first_tick<T: Clone>(
Expand Down
58 changes: 52 additions & 6 deletions hydroflow_plus/src/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::builder::FLOW_USED_MESSAGE;
use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{check_matching_location, LocationId, NoTick};
use crate::stream::NoOrder;
use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded};

pub struct Optional<T, L, B> {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
)
}

pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B> {
Expand All @@ -213,11 +214,32 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
)
}

pub fn flatten<U>(self) -> Stream<U, L, B>
pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder> {
let f = f.splice_fn1_ctx(&self.location).into();
Stream::new(
self.location,
HfPlusNode::FlatMap {
f,
input: Box::new(self.ir_node.into_inner()),
},
)
}

pub fn flatten_ordered<U>(self) -> Stream<U, L, B>
where
T: IntoIterator<Item = U>,
{
self.flat_map_ordered(q!(|v| v))
}

pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
where
T: IntoIterator<Item = U>,
{
self.flat_map(q!(|v| v))
self.flat_map_unordered(q!(|v| v))
}

pub fn filter<F: Fn(&T) -> bool + 'a>(
Expand Down Expand Up @@ -356,19 +378,43 @@ impl<'a, T, L: Location<'a>> Optional<T, L, Bounded> {
}

impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, L, B> {
pub fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
/// Given a tick, returns a optional value corresponding to a snapshot of the optional
/// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
/// relevant data that contributed to the snapshot at tick `t`.
///
/// # Safety
/// Because this picks a snapshot of a optional whose value is continuously changing,
/// the output optional has a non-deterministic value since the snapshot can be at an
/// arbitrary point in time.
pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
Optional::new(
tick.clone(),
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn tick_samples(self) -> Stream<T, L, Unbounded> {
/// Eagerly samples the optional as fast as possible, returning a stream of snapshots
/// with order corresponding to increasing prefixes of data contributing to the optional.
///
/// # Safety
/// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
/// to non-deterministic batching and arrival of inputs, the output stream is
/// non-deterministic.
pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
let tick = self.location.tick();
self.latest_tick(&tick).all_ticks()
}

pub fn sample_every(
/// Given a time interval, returns a stream corresponding to snapshots of the optional
/// value taken at various points in time. Because the input optional may be
/// [`Unbounded`], there are no guarantees on what these snapshots are other than they
/// represent the value of the optional given some prefix of the streams leading up to
/// it.
///
/// # Safety
/// The output stream is non-deterministic in which elements are sampled, since this
/// is controlled by a clock.
pub unsafe fn sample_every(
self,
interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
) -> Stream<T, L, Unbounded> {
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/rewrites/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mod tests {
let process = flow.process::<()>();

let tick = process.tick();
let before_tee = process.source_iter(q!(0..10)).tick_batch(&tick).persist();
let before_tee = unsafe { process.source_iter(q!(0..10)).tick_batch(&tick).persist() };

before_tee
.clone()
Expand Down
16 changes: 9 additions & 7 deletions hydroflow_plus/src/rewrites/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ mod tests {
let counter_func = q!(|count: &mut i32, _| *count += 1);
let _ = database.add_commutative_tag(counter_func, &tick);

process
.source_iter(q!(vec![]))
.map(q!(|string: String| (string, ())))
.tick_batch(&tick)
.fold_keyed(q!(|| 0), counter_func)
.all_ticks()
.for_each(q!(|(string, count)| println!("{}: {}", string, count)));
unsafe {
process
.source_iter(q!(vec![]))
.map(q!(|string: String| (string, ())))
.tick_batch(&tick)
}
.fold_keyed(q!(|| 0), counter_func)
.all_ticks()
.for_each(q!(|(string, count)| println!("{}: {}", string, count)));

let built = flow
.optimize_with(|ir| properties_optimize(ir, &database))
Expand Down
46 changes: 42 additions & 4 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,21 @@ impl<'a, T, L: Location<'a>, B> Singleton<T, L, B> {
)
}

pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B> {
let f = f.splice_fn1_ctx(&self.location).into();
Stream::new(
self.location,
HfPlusNode::FlatMap {
f,
input: Box::new(self.ir_node.into_inner()),
},
)
}

pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B> {
Expand Down Expand Up @@ -238,19 +252,43 @@ impl<'a, T, L: Location<'a>> Singleton<T, L, Bounded> {
}

impl<'a, T, L: Location<'a> + NoTick, B> Singleton<T, L, B> {
pub fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded> {
/// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
/// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
/// relevant data that contributed to the snapshot at tick `t`.
///
/// # Safety
/// Because this picks a snapshot of a singleton whose value is continuously changing,
/// the output singleton has a non-deterministic value since the snapshot can be at an
/// arbitrary point in time.
pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded> {
Singleton::new(
tick.clone(),
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn tick_samples(self) -> Stream<T, L, Unbounded> {
/// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
/// with order corresponding to increasing prefixes of data contributing to the singleton.
///
/// # Safety
/// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
/// to non-deterministic batching and arrival of inputs, the output stream is
/// non-deterministic.
pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
let tick = self.location.tick();
self.latest_tick(&tick).all_ticks()
}

pub fn sample_every(
/// Given a time interval, returns a stream corresponding to snapshots of the singleton
/// value taken at various points in time. Because the input singleton may be
/// [`Unbounded`], there are no guarantees on what these snapshots are other than they
/// represent the value of the singleton given some prefix of the streams leading up to
/// it.
///
/// # Safety
/// The output stream is non-deterministic in which elements are sampled, since this
/// is controlled by a clock.
pub unsafe fn sample_every(
self,
interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
) -> Stream<T, L, Unbounded> {
Expand Down
Loading

0 comments on commit 92a7be4

Please sign in to comment.