Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus)!: mark non-deterministic operators as unsafe and introduce timestamped streams #1584

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
}
}
],
"editor.semanticTokenColorCustomizations": {
"enabled": true,
"rules": {
"*.unsafe:rust": {
"foreground": "#ea1708",
"fontStyle": "bold"
}
}
},
MingweiSamuel marked this conversation as resolved.
Show resolved Hide resolved
"files.watcherExclude": {
"**/target": true
},
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ opt-level = "s"

[workspace.lints.rust]
unused_qualifications = "warn"
unsafe_op_in_unsafe_fn = "warn"

[workspace.lints.clippy]
allow_attributes = "warn"
Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![expect(
unused_qualifications,
non_local_definitions,
unsafe_op_in_unsafe_fn,
reason = "for pyo3 generated code"
)]

Expand Down
10 changes: 6 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::location::Location;
use crate::location::{Location, LocationId};
use crate::staging_util::Invariant;

pub enum ForwardRefMarker {}
Expand All @@ -9,7 +9,7 @@ pub trait DeferTick {
}

pub trait CycleComplete<'a, T> {
fn complete(self, ident: syn::Ident);
fn complete(self, ident: syn::Ident, expected_location: LocationId);
}

pub trait CycleCollection<'a, T>: CycleComplete<'a, T> {
Expand All @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
S::complete(stream, ident, self.expected_location)
}
}

pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream.defer_tick(), ident)
S::complete(stream.defer_tick(), ident, self.expected_location)
}
}
12 changes: 7 additions & 5 deletions hydroflow_plus/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> {

pub trait IsCluster {
type Tag;
fn id(&self) -> usize;
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
fn id(&self) -> usize {
self.id
}
}

impl<'a, C> Cluster<'a, C> {
Expand Down Expand Up @@ -125,8 +121,14 @@ where
where
Self: Sized,
{
let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
id
} else {
unreachable!()
};

let ident = syn::Ident::new(
&format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()),
&format!("__hydroflow_plus_cluster_self_id_{}", cluster_id),
Span::call_site(),
);
let root = get_this_crate();
Expand Down
26 changes: 23 additions & 3 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
}

pub trait Location<'a>: Clone {
type Root;
type Root: Location<'a>;

fn root(&self) -> Self::Root;

Expand Down Expand Up @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone {
)
}

fn source_interval(
/// Generates a stream with values emitted at a fixed interval, with
/// each value being the current time (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
Expand All @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone {
)))
}

fn source_interval_delayed(
/// Generates a stream with values emitted at a fixed interval (with an
/// initial delay), with each value being the current time
/// (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
Expand Down Expand Up @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down
97 changes: 92 additions & 5 deletions hydroflow_plus/src/location/tick.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use proc_macro2::Span;
use sealed::sealed;
use stageleft::{q, QuotedWithContext};

use super::{Cluster, Location, LocationId, Process};
Expand All @@ -12,10 +13,50 @@ use crate::cycle::{
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, Optional, Singleton, Stream};

#[sealed]
pub trait NoTick {}
#[sealed]
impl<T> NoTick for Process<'_, T> {}
#[sealed]
impl<T> NoTick for Cluster<'_, T> {}

#[sealed]
pub trait NoTimestamp {}
#[sealed]
impl<T> NoTimestamp for Process<'_, T> {}
#[sealed]
impl<T> NoTimestamp for Cluster<'_, T> {}
#[sealed]
impl<'a, L: Location<'a>> NoTimestamp for Tick<L> {}

#[derive(Clone)]
pub struct Timestamped<L> {
pub(crate) tick: Tick<L>,
}

impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
type Root = L::Root;

fn root(&self) -> Self::Root {
self.tick.root()
}

fn id(&self) -> LocationId {
self.tick.id()
}

fn flow_state(&self) -> &FlowState {
self.tick.flow_state()
}

fn is_top_level() -> bool {
L::is_top_level()
}
}

#[sealed]
impl<L> NoTick for Timestamped<L> {}

/// Marks the stream as being inside the single global clock domain.
#[derive(Clone)]
pub struct Tick<L> {
Expand Down Expand Up @@ -53,13 +94,20 @@ impl<'a, L: Location<'a>> Tick<L> {
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded>
where
L: NoTick,
L: NoTick + NoTimestamp,
{
self.l
let out = self
.l
.spin()
.flat_map(q!(move |_| 0..batch_size))
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
.timestamped(self);

unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
out.tick_batch()
}
Comment on lines +99 to +110
Copy link
Member

@MingweiSamuel MingweiSamuel Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now can just leave the rest of the unsafe blocks as is, but would be good as a convention to only wrap the unsafe method calls in unsafe, otherwise the whole call chain goes into unsafe and it doesn't distinguish anything (have to check each method called/memorize to see if that is the unsafe one)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I will try to write up a style guide soon on what to wrap in unsafe. Sometimes it's nice to include additional API calls if it is a "temporary"/"local" unsafety rather than one which propagates out the function call.

}

pub fn singleton<T: Clone>(
Expand All @@ -69,7 +117,10 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.outer().singleton(e).latest_tick(self)
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).timestamped(self).latest_tick()
}
}

pub fn singleton_first_tick<T: Clone>(
Expand Down Expand Up @@ -118,12 +169,46 @@ impl<'a, L: Location<'a>> Tick<L> {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
)
}

pub fn forward_ref_timestamped<
S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped<L>>,
>(
&self,
) -> (ForwardRef<'a, S>, S) {
let next_id = {
let on_id = match self.l.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, Timestamped { tick: self.clone() }),
)
}

pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
&self,
) -> (TickCycle<'a, S>, S)
Expand Down Expand Up @@ -151,6 +236,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down Expand Up @@ -187,6 +273,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, initial, self.clone()),
Expand Down
Loading
Loading