From e5d39ac7bfa7c0117e0a0d1c7082a0b6adc3f6be Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 22 Nov 2024 21:04:46 -0800 Subject: [PATCH] feat(hydroflow_plus)!: mark non-deterministic operators as unsafe and introduce timestamped streams Big PR. First big change is we introduce a `Timestamped` location. This is a bit of a hybrid between top-level locations and `Tick` locations. The idea is that you choose where timestamps are generated, and then have a guarantee that everything after that will be atomically computed (useful for making sure we add payloads to the log before ack-ing). The contract is that an operator or module that takes a `Timestamped` input must still be deterministic regardless of the stamps on messages (which are hidden unless you `tick_batch`). But unlike a top-level stream (which has the same constraints), you have the atomicity guarantee. Right now the guarantee is trivial since we have one global tick for everything. But in the future when we want to apply @davidchuyaya's optimizations this will be helpful to know when there are causal dependencies on when data can be sent to others. Second change is we mark every non-deterministic operator (modulo explicit annotations such as `NoOrder`) with Rust's `unsafe` keyword. This makes it super clear where non-determinism is taking place. I've used this to put `unsafe` blocks throughout our example code and add `SAFETY` annotations that argue why the non-determinism is safe (or point out that we've explicitly documented / expect non-determinism). I also added `#![warn(unsafe_op_in_unsafe_fn)]` to the examples and the template, since this forces good hygiene of annotating sources of non-determinism even inside a module that is intentionally non-deterministic. Paxos changes are mostly refactors, and I verified that the performance is the same as before. --- .vscode/settings.json | 9 + Cargo.toml | 1 + hydro_deploy/hydro_cli/src/lib.rs | 1 + hydroflow_plus/src/cycle.rs | 10 +- hydroflow_plus/src/location/cluster/mod.rs | 12 +- hydroflow_plus/src/location/mod.rs | 26 +- hydroflow_plus/src/location/tick.rs | 97 ++++- hydroflow_plus/src/optional.rs | 167 ++++++-- hydroflow_plus/src/rewrites/persist_pullup.rs | 8 +- hydroflow_plus/src/rewrites/properties.rs | 17 +- hydroflow_plus/src/singleton.rs | 257 ++++++++++-- hydroflow_plus/src/stream.rs | 315 ++++++++++---- hydroflow_plus_test/src/cluster/compute_pi.rs | 22 +- hydroflow_plus_test/src/cluster/map_reduce.rs | 40 +- hydroflow_plus_test/src/cluster/paxos.rs | 393 ++++++++++-------- .../src/cluster/paxos_bench.rs | 168 +++++--- hydroflow_plus_test/src/cluster/paxos_kv.rs | 44 +- hydroflow_plus_test/src/cluster/quorum.rs | 110 ++++- .../src/cluster/request_response.rs | 17 +- .../src/cluster/simple_cluster.rs | 2 - ...cluster__paxos_bench__tests__paxos_ir.snap | 216 ++++------ hydroflow_plus_test/src/cluster/two_pc.rs | 63 +-- .../src/local/chat_app.rs | 24 +- .../src/local/compute_pi.rs | 34 +- .../src/local/count_elems.rs | 11 +- .../src/local/graph_reachability.rs | 13 +- .../src/local/negation.rs | 36 +- .../src/local/teed_join.rs | 8 +- stageleft/src/lib.rs | 1 - template/hydroflow_plus/Cargo.toml | 3 + 30 files changed, 1481 insertions(+), 644 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 80aead1799ae..d650a5fd0e7c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,6 +13,15 @@ } } ], + "editor.semanticTokenColorCustomizations": { + "enabled": true, + "rules": { + "*.unsafe:rust": { + "foreground": "#ea1708", + "fontStyle": "bold" + } + } + }, "files.watcherExclude": { "**/target": true }, diff --git a/Cargo.toml b/Cargo.toml index ab8fef76bc85..224bad277975 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ opt-level = "s" [workspace.lints.rust] unused_qualifications = "warn" +unsafe_op_in_unsafe_fn = "warn" [workspace.lints.clippy] allow_attributes = "warn" diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 2c1de84faff7..663abe4cc907 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -1,6 +1,7 @@ #![expect( unused_qualifications, non_local_definitions, + unsafe_op_in_unsafe_fn, reason = "for pyo3 generated code" )] diff --git a/hydroflow_plus/src/cycle.rs b/hydroflow_plus/src/cycle.rs index 4d61cc1d30ba..1ac45600f7b2 100644 --- a/hydroflow_plus/src/cycle.rs +++ b/hydroflow_plus/src/cycle.rs @@ -1,4 +1,4 @@ -use crate::location::Location; +use crate::location::{Location, LocationId}; use crate::staging_util::Invariant; pub enum ForwardRefMarker {} @@ -9,7 +9,7 @@ pub trait DeferTick { } pub trait CycleComplete<'a, T> { - fn complete(self, ident: syn::Ident); + fn complete(self, ident: syn::Ident, expected_location: LocationId); } pub trait CycleCollection<'a, T>: CycleComplete<'a, T> { @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> { /// See [`crate::FlowBuilder`] for an explainer on the type parameters. pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> { pub(crate) ident: syn::Ident, + pub(crate) expected_location: LocationId, pub(crate) _phantom: Invariant<'a, S>, } impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> { pub fn complete(self, stream: S) { let ident = self.ident; - S::complete(stream, ident) + S::complete(stream, ident, self.expected_location) } } pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> { pub(crate) ident: syn::Ident, + pub(crate) expected_location: LocationId, pub(crate) _phantom: Invariant<'a, S>, } impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> { pub fn complete_next_tick(self, stream: S) { let ident = self.ident; - S::complete(stream.defer_tick(), ident) + S::complete(stream.defer_tick(), ident, self.expected_location) } } diff --git a/hydroflow_plus/src/location/cluster/mod.rs b/hydroflow_plus/src/location/cluster/mod.rs index cfd83e195d8d..6d2b71cc3622 100644 --- a/hydroflow_plus/src/location/cluster/mod.rs +++ b/hydroflow_plus/src/location/cluster/mod.rs @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> { pub trait IsCluster { type Tag; - fn id(&self) -> usize; } impl IsCluster for Cluster<'_, C> { type Tag = C; - fn id(&self) -> usize { - self.id - } } impl<'a, C> Cluster<'a, C> { @@ -125,8 +121,14 @@ where where Self: Sized, { + let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() { + id + } else { + unreachable!() + }; + let ident = syn::Ident::new( - &format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()), + &format!("__hydroflow_plus_cluster_self_id_{}", cluster_id), Span::call_site(), ); let root = get_this_crate(); diff --git a/hydroflow_plus/src/location/mod.rs b/hydroflow_plus/src/location/mod.rs index 61693cb7c66f..70e39d36a353 100644 --- a/hydroflow_plus/src/location/mod.rs +++ b/hydroflow_plus/src/location/mod.rs @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) { } pub trait Location<'a>: Clone { - type Root; + type Root: Location<'a>; fn root(&self) -> Self::Root; @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone { ) } - fn source_interval( + /// Generates a stream with values emitted at a fixed interval, with + /// each value being the current time (as an [`tokio::time::Instant`]). + /// + /// The clock source used is monotonic, so elements will be emitted in + /// increasing order. + /// + /// # Safety + /// Because this stream is generated by an OS timer, it will be + /// non-deterministic because each timestamp will be arbitrary. + unsafe fn source_interval( &self, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, ) -> Stream @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone { ))) } - fn source_interval_delayed( + /// Generates a stream with values emitted at a fixed interval (with an + /// initial delay), with each value being the current time + /// (as an [`tokio::time::Instant`]). + /// + /// The clock source used is monotonic, so elements will be emitted in + /// increasing order. + /// + /// # Safety + /// Because this stream is generated by an OS timer, it will be + /// non-deterministic because each timestamp will be arbitrary. + unsafe fn source_interval_delayed( &self, delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone { ( ForwardRef { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), diff --git a/hydroflow_plus/src/location/tick.rs b/hydroflow_plus/src/location/tick.rs index 809a750c9bca..7d23bec7fe08 100644 --- a/hydroflow_plus/src/location/tick.rs +++ b/hydroflow_plus/src/location/tick.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use proc_macro2::Span; +use sealed::sealed; use stageleft::{q, QuotedWithContext}; use super::{Cluster, Location, LocationId, Process}; @@ -12,10 +13,50 @@ use crate::cycle::{ use crate::ir::{HfPlusNode, HfPlusSource}; use crate::{Bounded, Optional, Singleton, Stream}; +#[sealed] pub trait NoTick {} +#[sealed] impl NoTick for Process<'_, T> {} +#[sealed] impl NoTick for Cluster<'_, T> {} +#[sealed] +pub trait NoTimestamp {} +#[sealed] +impl NoTimestamp for Process<'_, T> {} +#[sealed] +impl NoTimestamp for Cluster<'_, T> {} +#[sealed] +impl<'a, L: Location<'a>> NoTimestamp for Tick {} + +#[derive(Clone)] +pub struct Timestamped { + pub(crate) tick: Tick, +} + +impl<'a, L: Location<'a>> Location<'a> for Timestamped { + type Root = L::Root; + + fn root(&self) -> Self::Root { + self.tick.root() + } + + fn id(&self) -> LocationId { + self.tick.id() + } + + fn flow_state(&self) -> &FlowState { + self.tick.flow_state() + } + + fn is_top_level() -> bool { + L::is_top_level() + } +} + +#[sealed] +impl NoTick for Timestamped {} + /// Marks the stream as being inside the single global clock domain. #[derive(Clone)] pub struct Tick { @@ -53,13 +94,20 @@ impl<'a, L: Location<'a>> Tick { batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a, ) -> Stream<(), Self, Bounded> where - L: NoTick, + L: NoTick + NoTimestamp, { - self.l + let out = self + .l .spin() - .flat_map(q!(move |_| 0..batch_size)) + .flat_map_ordered(q!(move |_| 0..batch_size)) .map(q!(|_| ())) - .tick_batch(self) + .timestamped(self); + + unsafe { + // SAFETY: at runtime, `spin` produces a single value per tick, + // so each batch is guaranteed to be the same size. + out.tick_batch() + } } pub fn singleton( @@ -69,7 +117,10 @@ impl<'a, L: Location<'a>> Tick { where L: NoTick, { - self.outer().singleton(e).latest_tick(self) + unsafe { + // SAFETY: a top-level singleton produces the same value each tick + self.outer().singleton(e).timestamped(self).latest_tick() + } } pub fn singleton_first_tick( @@ -118,12 +169,46 @@ impl<'a, L: Location<'a>> Tick { ( ForwardRef { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), ) } + pub fn forward_ref_timestamped< + S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped>, + >( + &self, + ) -> (ForwardRef<'a, S>, S) { + let next_id = { + let on_id = match self.l.id() { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), + LocationId::ExternalProcess(_) => panic!(), + }; + + let mut flow_state = self.flow_state().borrow_mut(); + let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); + + let id = *next_id_entry; + *next_id_entry += 1; + id + }; + + let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); + + ( + ForwardRef { + ident: ident.clone(), + expected_location: self.id(), + _phantom: PhantomData, + }, + S::create_source(ident, Timestamped { tick: self.clone() }), + ) + } + pub fn cycle + DeferTick>( &self, ) -> (TickCycle<'a, S>, S) @@ -151,6 +236,7 @@ impl<'a, L: Location<'a>> Tick { ( TickCycle { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), @@ -187,6 +273,7 @@ impl<'a, L: Location<'a>> Tick { ( TickCycle { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, initial, self.clone()), diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs index bfde8c2ef5e1..6d422d14d5bd 100644 --- a/hydroflow_plus/src/optional.rs +++ b/hydroflow_plus/src/optional.rs @@ -9,7 +9,10 @@ use syn::parse_quote; use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{check_matching_location, LocationId, NoTick}; +use crate::singleton::ZipResult; +use crate::stream::NoOrder; use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded}; pub struct Optional { @@ -61,7 +64,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Optional, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -94,7 +102,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Optional, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -127,7 +140,12 @@ impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> } impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Optional { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -179,15 +197,6 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional { } impl<'a, T, L: Location<'a>, B> Optional { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - if L::is_top_level() { - panic!("Converting an optional to a stream is not yet supported at the top level"); - } - - Stream::new(self.location, self.ir_node.into_inner()) - } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional { let f = f.splice_fn1_ctx(&self.location).into(); Optional::new( @@ -199,7 +208,7 @@ impl<'a, T, L: Location<'a>, B> Optional { ) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -213,11 +222,32 @@ impl<'a, T, L: Location<'a>, B> Optional { ) } - pub fn flatten(self) -> Stream + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flatten_ordered(self) -> Stream where T: IntoIterator, { - self.flat_map(q!(|v| v)) + self.flat_map_ordered(q!(|v| v)) + } + + pub fn flatten_unordered(self) -> Stream + where + T: IntoIterator, + { + self.flat_map_unordered(q!(|v| v)) } pub fn filter bool + 'a>( @@ -350,48 +380,123 @@ impl<'a, T, L: Location<'a>> Optional { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } - pub fn then(self, value: Singleton) -> Optional { + pub fn then(self, value: Singleton) -> Optional + where + Singleton: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(U, ()), L, Bounded>, + >, + { value.continue_if(self) } + + pub fn into_stream(self) -> Stream { + if L::is_top_level() { + panic!("Converting an optional to a stream is not yet supported at the top level"); + } + + Stream::new(self.location, self.ir_node.into_inner()) + } } -impl<'a, T, L: Location<'a> + NoTick, B> Optional { - pub fn latest_tick(self, tick: &Tick) -> Optional, Bounded> { +impl<'a, T, L: Location<'a> + NoTick, B> Optional, B> { + /// Given a tick, returns a optional value corresponding to a snapshot of the optional + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a optional whose value is continuously changing, + /// the output optional has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self) -> Optional, Bounded> { Optional::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn drop_timestamp(self) -> Optional { + Optional::new(self.location.tick.l, self.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> Optional { + pub fn timestamped(self, tick: &Tick) -> Optional, B> { + Optional::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Eagerly samples the optional as fast as possible, returning a stream of snapshots + /// with order corresponding to increasing prefixes of data contributing to the optional. + /// + /// # Safety + /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due + /// to non-deterministic batching and arrival of inputs, the output stream is + /// non-deterministic. + pub unsafe fn sample_eager(self) -> Stream { let tick = self.location.tick(); - self.latest_tick(&tick).all_ticks() + + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .all_ticks() + .drop_timestamp() + } } - pub fn sample_every( + /// Given a time interval, returns a stream corresponding to snapshots of the optional + /// value taken at various points in time. Because the input optional may be + /// [`Unbounded`], there are no guarantees on what these snapshots are other than they + /// represent the value of the optional given some prefix of the streams leading up to + /// it. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, - ) -> Stream { - let samples = self.location.source_interval(interval); + ) -> Stream + where + L: NoTimestamp, + { + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; let tick = self.location.tick(); - self.latest_tick(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } } impl<'a, T, L: Location<'a>> Optional, Bounded> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Optional { + pub fn latest(self) -> Optional, Unbounded> { Optional::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index 21c2a130eb99..7176dfde8998 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -168,7 +168,13 @@ mod tests { let process = flow.process::<()>(); let tick = process.tick(); - let before_tee = process.source_iter(q!(0..10)).tick_batch(&tick).persist(); + let before_tee = unsafe { + process + .source_iter(q!(0..10)) + .timestamped(&tick) + .tick_batch() + .persist() + }; before_tee .clone() diff --git a/hydroflow_plus/src/rewrites/properties.rs b/hydroflow_plus/src/rewrites/properties.rs index f09b8a119266..2f0323ab7792 100644 --- a/hydroflow_plus/src/rewrites/properties.rs +++ b/hydroflow_plus/src/rewrites/properties.rs @@ -121,13 +121,16 @@ mod tests { let counter_func = q!(|count: &mut i32, _| *count += 1); let _ = database.add_commutative_tag(counter_func, &tick); - process - .source_iter(q!(vec![])) - .map(q!(|string: String| (string, ()))) - .tick_batch(&tick) - .fold_keyed(q!(|| 0), counter_func) - .all_ticks() - .for_each(q!(|(string, count)| println!("{}: {}", string, count))); + unsafe { + process + .source_iter(q!(vec![])) + .map(q!(|string: String| (string, ()))) + .timestamped(&tick) + .tick_batch() + } + .fold_keyed(q!(|| 0), counter_func) + .all_ticks() + .for_each(q!(|(string, count)| println!("{}: {}", string, count))); let built = flow .optimize_with(|ir| properties_optimize(ir, &database)) diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index f89957e51dc2..819b3c06dfc7 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -11,6 +11,7 @@ use crate::cycle::{ TickCycleMarker, }; use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick}; use crate::{Bounded, Optional, Stream, Unbounded}; @@ -68,7 +69,12 @@ impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Singleton, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -103,7 +109,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker> impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Singleton, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -118,6 +129,46 @@ impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> } } +impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> + for Singleton +{ + type Location = L; + + fn create_source(ident: syn::Ident, location: L) -> Self { + let location_id = location.id(); + Singleton::new( + location, + HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { + ident, + location_kind: location_id, + })), + ) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> + for Singleton +{ + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); + self.location + .flow_state() + .borrow_mut() + .leaves + .as_mut() + .expect(FLOW_USED_MESSAGE) + .push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind(), + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + }); + } +} + impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { @@ -143,11 +194,6 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton { } impl<'a, T, L: Location<'a>, B> Singleton { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - Stream::new(self.location, self.ir_node.into_inner()) - } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton { let f = f.splice_fn1_ctx(&self.location).into(); Singleton::new( @@ -159,7 +205,21 @@ impl<'a, T, L: Location<'a>, B> Singleton { ) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -225,55 +285,128 @@ impl<'a, T, L: Location<'a>, B> Singleton { ) } } -} -impl<'a, T, L: Location<'a>> Singleton { - pub fn continue_if(self, signal: Optional) -> Optional { + pub fn continue_if(self, signal: Optional) -> Optional + where + Self: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(T, ()), L, Bounded>, + >, + { self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) } - pub fn continue_unless(self, other: Optional) -> Optional { + pub fn continue_unless(self, other: Optional) -> Optional + where + Singleton: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(T, ()), L, Bounded>, + >, + { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } } -impl<'a, T, L: Location<'a> + NoTick, B> Singleton { - pub fn latest_tick(self, tick: &Tick) -> Singleton, Bounded> { +impl<'a, T, L: Location<'a> + NoTick, B> Singleton, B> { + /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a singleton whose value is continuously changing, + /// the output singleton has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self) -> Singleton, Bounded> { Singleton::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn drop_timestamp(self) -> Optional { + Optional::new(self.location.tick.l, self.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> Singleton { + pub fn timestamped(self, tick: &Tick) -> Singleton, B> { + Singleton::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots + /// with order corresponding to increasing prefixes of data contributing to the singleton. + /// + /// # Safety + /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due + /// to non-deterministic batching and arrival of inputs, the output stream is + /// non-deterministic. + pub unsafe fn sample_eager(self) -> Stream { let tick = self.location.tick(); - self.latest_tick(&tick).all_ticks() + + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .all_ticks() + .drop_timestamp() + } } - pub fn sample_every( + /// Given a time interval, returns a stream corresponding to snapshots of the singleton + /// value taken at various points in time. Because the input singleton may be + /// [`Unbounded`], there are no guarantees on what these snapshots are other than they + /// represent the value of the singleton given some prefix of the streams leading up to + /// it. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, - ) -> Stream { - let samples = self.location.source_interval(interval); + ) -> Stream + where + L: NoTimestamp, + { + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; let tick = self.location.tick(); - self.latest_tick(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } } impl<'a, T, L: Location<'a>> Singleton, Bounded> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Singleton { + pub fn latest(self) -> Singleton, Unbounded> { Singleton::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } @@ -298,6 +431,10 @@ impl<'a, T, L: Location<'a>> Singleton, Bounded> { HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), ) } + + pub fn into_stream(self) -> Stream, Bounded> { + Stream::new(self.location, self.ir_node.into_inner()) + } } pub trait ZipResult<'a, Other> { @@ -310,36 +447,78 @@ pub trait ZipResult<'a, Other> { fn make(location: Self::Location, ir_node: HfPlusNode) -> Self::Out; } -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton> for Singleton { - type Out = Singleton<(T, U), L, B>; - type Location = L; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> + for Singleton, B> +{ + type Out = Singleton<(T, U), Timestamped, B>; + type Location = Timestamped; - fn other_location(other: &Singleton) -> L { + fn other_location(other: &Singleton, B>) -> Timestamped { other.location.clone() } - fn other_ir_node(other: Singleton) -> HfPlusNode { + fn other_ir_node(other: Singleton, B>) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: L, ir_node: HfPlusNode) -> Self::Out { + fn make(location: Timestamped, ir_node: HfPlusNode) -> Self::Out { Singleton::new(location, ir_node) } } -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional> for Singleton { - type Out = Optional<(T, U), L, B>; - type Location = L; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional, B>> + for Singleton, B> +{ + type Out = Optional<(T, U), Timestamped, B>; + type Location = Timestamped; + + fn other_location(other: &Optional, B>) -> Timestamped { + other.location.clone() + } + + fn other_ir_node(other: Optional, B>) -> HfPlusNode { + other.ir_node.into_inner() + } + + fn make(location: Timestamped, ir_node: HfPlusNode) -> Self::Out { + Optional::new(location, ir_node) + } +} + +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> + for Singleton, B> +{ + type Out = Singleton<(T, U), Tick, B>; + type Location = Tick; + + fn other_location(other: &Singleton, B>) -> Tick { + other.location.clone() + } + + fn other_ir_node(other: Singleton, B>) -> HfPlusNode { + other.ir_node.into_inner() + } + + fn make(location: Tick, ir_node: HfPlusNode) -> Self::Out { + Singleton::new(location, ir_node) + } +} + +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional, B>> + for Singleton, B> +{ + type Out = Optional<(T, U), Tick, B>; + type Location = Tick; - fn other_location(other: &Optional) -> L { + fn other_location(other: &Optional, B>) -> Tick { other.location.clone() } - fn other_ir_node(other: Optional) -> HfPlusNode { + fn other_ir_node(other: Optional, B>) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: L, ir_node: HfPlusNode) -> Self::Out { + fn make(location: Tick, ir_node: HfPlusNode) -> Self::Out { Optional::new(location, ir_node) } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 2da21bf8e47f..06db898941a1 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -11,12 +11,14 @@ use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::{q, IntoQuotedMut, QuotedWithContext}; use syn::parse_quote; +use tokio::time::Instant; use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, TeeNode}; use crate::location::cluster::CLUSTER_SELF_ID; use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{ check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, }; @@ -115,7 +117,12 @@ impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> for Stream, Bounded, Order> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -150,7 +157,12 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMa impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> for Stream { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -221,7 +233,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { self.map(q!(|d| d.clone())) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -235,11 +247,32 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { ) } - pub fn flatten(self) -> Stream + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flatten_ordered(self) -> Stream + where + T: IntoIterator, + { + self.flat_map_ordered(q!(|d| d)) + } + + pub fn flatten_unordered(self) -> Stream where T: IntoIterator, { - self.flat_map(q!(|d| d)) + self.flat_map_unordered(q!(|d| d)) } pub fn filter bool + 'a>( @@ -366,7 +399,15 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { } } - pub fn assume_ordering(self) -> Stream { + /// Explicitly "casts" the stream to a type with a different ordering + /// guarantee. Useful in unsafe code where the ordering cannot be proven + /// by the type-system. + /// + /// # Safety + /// This function is used as an escape hatch, and any mistakes in the + /// provided ordering guarantee will propogate into the guarantees + /// for the rest of the program. + pub unsafe fn assume_ordering(self) -> Stream { Stream::new(self.location, self.ir_node.into_inner()) } } @@ -558,15 +599,21 @@ impl<'a, T, L: Location<'a>> Stream { } } -impl<'a, T, L: Location<'a> + NoTick> Stream { +impl<'a, T, L: Location<'a> + NoTick + NoTimestamp> Stream { pub fn union( self, other: Stream, ) -> Stream { let tick = self.location.tick(); - self.tick_batch(&tick) - .union(other.tick_batch(&tick)) - .all_ticks() + unsafe { + // SAFETY: Because the inputs and outputs are unordered, + // we can interleave batches from both streams. + self.timestamped(&tick) + .tick_batch() + .union(other.timestamped(&tick).tick_batch()) + .all_ticks() + .drop_timestamp() + } } } @@ -704,32 +751,112 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde } } -impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { - pub fn tick_batch(self, tick: &Tick) -> Stream, Bounded, Order> { +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream, B, Order> { + /// Given a tick, returns a stream corresponding to a batch of elements for that tick. + /// These batches are guaranteed to be contiguous across ticks and preserve the order + /// of the input. + /// + /// # Safety + /// The batch boundaries are non-deterministic and may change across executions. + pub unsafe fn tick_batch(self) -> Stream, Bounded, Order> { Stream::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_prefix(self, tick: &Tick) -> Stream, Bounded, Order> - where - T: Clone, - { - self.tick_batch(tick).persist() + pub fn drop_timestamp(self) -> Stream { + Stream::new(self.location.tick.l, self.ir_node.into_inner()) } - pub fn sample_every( + pub fn timestamp_source(&self) -> Tick { + self.location.tick.clone() + } +} + +impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream { + pub fn timestamped(self, tick: &Tick) -> Stream, B, Order> { + Stream::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Given a time interval, returns a stream corresponding to samples taken from the + /// stream roughly at that interval. The output will have elements in the same order + /// as the input, but with arbitrary elements skipped between samples. There is also + /// no guarantee on the exact timing of the samples. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, ) -> Stream { - let samples = self.location.source_interval(interval); + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; + let tick = self.location.tick(); - self.tick_batch(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .tick_batch() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } + /// Given a timeout duration, returns an [`Optional`] which will have a value if the + /// stream has not emitted a value since that duration. + /// + /// # Safety + /// Timeout relies on non-deterministic sampling of the stream, so depending on when + /// samples take place, timeouts may be non-deterministically generated or missed, + /// and the notification of the timeout may be delayed as well. There is also no + /// guarantee on how long the [`Optional`] will have a value after the timeout is + /// detected based on when the next sample is taken. + pub unsafe fn timeout( + self, + duration: impl QuotedWithContext<'a, std::time::Duration, Tick> + Copy + 'a, + ) -> Optional<(), L, Unbounded> + where + Order: MinOrder, + { + let tick = self.location.tick(); + + let latest_received = self.fold_commutative( + q!(|| None), + q!(|latest, _| { + // Note: May want to check received ballot against our own? + *latest = Some(Instant::now()); + }), + ); + + unsafe { + // SAFETY: Non-deterministic delay in detecting a timeout is expected. + latest_received.timestamped(&tick).latest_tick() + } + .filter_map(q!(move |latest_received| { + if let Some(latest_received) = latest_received { + if Instant::now().duration_since(latest_received) > duration { + Some(()) + } else { + None + } + } else { + Some(()) + } + })) + .latest() + .drop_timestamp() + } +} + +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn for_each(self, f: impl IntoQuotedMut<'a, F, L>) { let f = f.splice_fn1_ctx(&self.location).into(); self.location @@ -762,9 +889,11 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { } impl<'a, T, L: Location<'a>, Order> Stream, Bounded, Order> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded, Order> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location.clone(), + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } @@ -848,12 +977,17 @@ impl<'a, T, C1, B, Order> Stream, B, Order> { Order: MinOrder< as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder>, { - self.map(q!(move |b| ( - ClusterId::from_raw(CLUSTER_SELF_ID.raw_id), - b.clone() - ))) - .send_bincode_interleaved(other) - .assume_ordering() // this is safe because we are mapping clusters 1:1 + let sent = self + .map(q!(move |b| ( + ClusterId::from_raw(CLUSTER_SELF_ID.raw_id), + b.clone() + ))) + .send_bincode_interleaved(other); + + unsafe { + // SAFETY: this is safe because we are mapping clusters 1:1 + sent.assume_ordering() + } } } @@ -863,9 +997,12 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Process<'a, P2>, ) -> Stream, Unbounded, Order> where - L: CanSend<'a, Process<'a, P2>, In = T, Out = T>, + L::Root: CanSend<'a, Process<'a, P2>, In = T, Out = T>, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder, Min = Order>, + Order: MinOrder< + >>::OutStrongestOrder, + Min = Order, + >, { self.send_bincode::, T>(other) } @@ -873,20 +1010,20 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bincode, CoreType>( self, other: &L2, - ) -> Stream, L2, Unbounded, Order::Min> + ) -> Stream<>::Out, L2, Unbounded, Order::Min> where - L: CanSend<'a, L2, In = T>, + L::Root: CanSend<'a, L2, In = T>, CoreType: Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>::OutStrongestOrder>, { - let serialize_pipeline = Some(serialize_bincode::(L::is_demux())); + let serialize_pipeline = Some(serialize_bincode::(L::Root::is_demux())); - let deserialize_pipeline = Some(deserialize_bincode::(L::tagged_type())); + let deserialize_pipeline = Some(deserialize_bincode::(L::Root::tagged_type())); Stream::new( other.clone(), HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: None, @@ -921,7 +1058,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { leaves.push(HfPlusLeaf::ForEach { f: dummy_f.into(), input: Box::new(HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: Some(external_key), @@ -942,22 +1079,22 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bytes>( self, other: &L2, - ) -> Stream, L2, Unbounded, Order::Min> + ) -> Stream<>::Out, L2, Unbounded, Order::Min> where - L: CanSend<'a, L2, In = T>, - Order: MinOrder>, + L::Root: CanSend<'a, L2, In = T>, + Order: MinOrder<>::OutStrongestOrder>, { let root = get_this_crate(); Stream::new( other.clone(), HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: None, serialize_pipeline: None, instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: if let Some(c_type) = L::tagged_type() { + deserialize_pipeline: if let Some(c_type) = L::Root::tagged_type() { Some( parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))), ) @@ -971,7 +1108,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bytes_external(self, other: &ExternalProcess) -> ExternalBytesPort where - L: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = Bytes>, + L::Root: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = Bytes>, { let mut flow_state_borrow = self.location.flow_state().borrow_mut(); let external_key = flow_state_borrow.next_external_out; @@ -984,7 +1121,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { leaves.push(HfPlusLeaf::ForEach { f: dummy_f.into(), input: Box::new(HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: Some(external_key), @@ -1006,9 +1143,9 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &L2, ) -> Stream where - L: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, + L::Root: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, CoreType: Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>::OutStrongestOrder>, { self.send_bincode::(other).map(q!(|(_, b)| b)) } @@ -1018,24 +1155,30 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &L2, ) -> Stream where - L: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, - Order: MinOrder>, + L::Root: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, + Order: MinOrder<>::OutStrongestOrder>, { self.send_bytes::(other).map(q!(|(_, b)| b)) } + #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")] pub fn broadcast_bincode( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> + ) -> Stream< + >>::Out, + Cluster<'a, C2>, + Unbounded, + Order::Min, + > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); - self.flat_map(q!(|b| ids.iter().map(move |id| ( + self.flat_map_ordered(q!(|b| ids.iter().map(move |id| ( ::std::clone::Clone::clone(id), ::std::clone::Clone::clone(&b) )))) @@ -1047,25 +1190,31 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Cluster<'a, C2>, ) -> Stream, Unbounded, Order::Min> where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { self.broadcast_bincode(other).map(q!(|(_, b)| b)) } + #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")] pub fn broadcast_bytes( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> + ) -> Stream< + >>::Out, + Cluster<'a, C2>, + Unbounded, + Order::Min, + > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); - self.flat_map(q!(|b| ids.iter().map(move |id| ( + self.flat_map_ordered(q!(|b| ids.iter().map(move |id| ( ::std::clone::Clone::clone(id), ::std::clone::Clone::clone(&b) )))) @@ -1077,10 +1226,10 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Cluster<'a, C2>, ) -> Stream, Unbounded, Order::Min> where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { self.broadcast_bytes(other).map(q!(|(_, b)| b)) } @@ -1092,15 +1241,18 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self, other: &Cluster<'a, C2>, ) -> Stream< - L::Out, + >>::Out, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); @@ -1116,12 +1268,15 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { T, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { self.round_robin_bincode(other).map(q!(|(_, b)| b)) } @@ -1130,15 +1285,18 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self, other: &Cluster<'a, C2>, ) -> Stream< - L::Out, + >>::Out, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); @@ -1154,13 +1312,16 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { Bytes, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { self.round_robin_bytes(other).map(q!(|(_, b)| b)) } diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 00bf90b5348a..0eff334e7160 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -29,21 +29,25 @@ pub fn compute_pi<'a>( ) .all_ticks(); - trials + let estimate = trials .send_bincode_interleaved(&process) .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; - })) - .sample_every(q!(Duration::from_secs(1))) - .for_each(q!(|(inside, total)| { - println!( - "pi: {} ({} trials)", - 4.0 * inside as f64 / total as f64, - total - ); })); + unsafe { + // SAFETY: intentional non-determinism + estimate.sample_every(q!(Duration::from_secs(1))) + } + .for_each(q!(|(inside, total)| { + println!( + "pi: {} ({} trials)", + 4.0 * inside as f64 / total as f64, + total + ); + })); + (cluster, process) } diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index a46e1d0d401a..42d72a221881 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -11,22 +11,32 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' .source_iter(q!(vec!["abc", "abc", "xyz", "abc"])) .map(q!(|s| s.to_string())); - words + let partitioned_words = words .round_robin_bincode(&cluster) - .map(q!(|string| (string, ()))) - .tick_batch(&cluster.tick()) - .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) - .inspect(q!(|(string, count)| println!( - "partition count: {} - {}", - string, count - ))) - .all_ticks() - .send_bincode_interleaved(&process) - .tick_batch(&process.tick()) - .persist() - .reduce_keyed_commutative(q!(|total, count| *total += count)) - .all_ticks() - .for_each(q!(|(string, count)| println!("{}: {}", string, count))); + .map(q!(|string| (string, ()))); + + let batches = unsafe { + // SAFETY: addition is associative so we can batch reduce + partitioned_words.timestamped(&cluster.tick()).tick_batch() + } + .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) + .inspect(q!(|(string, count)| println!( + "partition count: {} - {}", + string, count + ))) + .all_ticks() + .send_bincode_interleaved(&process); + + unsafe { + // SAFETY: addition is associative so we can batch reduce + batches + .timestamped(&process.tick()) + .tick_batch() + .persist() + .reduce_keyed_commutative(q!(|total, count| *total += count)) + } + .all_ticks() + .for_each(q!(|(string, count)| println!("{}: {}", string, count))); (process, cluster) } diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 875d3626dd7f..e06628c28841 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -4,12 +4,12 @@ use std::hash::Hash; use std::time::Duration; use hydroflow_plus::*; +use location::tick::Timestamped; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use stream::NoOrder; -use tokio::time::Instant; -use super::quorum::collect_quorum; +use super::quorum::{collect_quorum, collect_quorum_with_response}; use super::request_response::join_responses; pub struct Proposer {} @@ -51,12 +51,28 @@ struct P2a

{ value: Option

, // might be a re-committed hole } +/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors +/// to sequence payloads being sent to the proposers. +/// +/// Proposers that currently are the leader will work with acceptors to sequence incoming +/// payloads, but may drop payloads if they are not the lader or lose leadership during +/// the commit process. +/// +/// Returns a stream of ballots, where new values are emitted when a new leader is elected, +/// and a stream of sequenced payloads with an index and optional payload (in the case of +/// holes in the log). +/// +/// # Safety +/// When the leader is stable, the algorithm will commit incoming payloads to the leader +/// in deterministic order. However, when the leader is changing, payloads may be +/// non-deterministically dropped. The stream of ballots is also non-deterministic because +/// leaders are elected in a non-deterministic process. #[expect( clippy::too_many_arguments, clippy::type_complexity, reason = "internal paxos code // TODO" )] -pub fn paxos_core<'a, P: PaxosPayload, R>( +pub unsafe fn paxos_core<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, r_to_acceptors_checkpoint: Stream< @@ -90,43 +106,64 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let (a_log_complete_cycle, a_log_forward_reference) = acceptor_tick.forward_ref::>(); - let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( - proposers, - acceptors, - &proposer_tick, - &acceptor_tick, - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - sequencing_max_ballot_forward_reference, - a_log_forward_reference, - ); + let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe { + // SAFETY: The primary non-determinism exposed by leader election algorithm lies in which leader + // is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot + // or leader flag will only lead to failure in sequencing rather than commiting the wrong value. Because + // ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are + // guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader. + leader_election( + proposers, + acceptors, + &proposer_tick, + &acceptor_tick, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + sequencing_max_ballot_forward_reference, + a_log_forward_reference, + ) + }; let just_became_leader = p_is_leader .clone() .continue_unless(p_is_leader.clone().defer_tick()); - let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload( - proposers, - acceptors, - &proposer_tick, - &acceptor_tick, - c_to_proposers, - r_to_acceptors_checkpoint, - p_ballot.clone(), - p_is_leader, - p_relevant_p1bs, - f, - a_max_ballot, - ); - - a_log_complete_cycle.complete(a_log); + let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe { + // SAFETY: The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because + // we use a quorum, if we remain the leader there are no missing committed values when we combine the logs. + // The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state + // of acceptors, which in the worst case will lead to dropped payloads as documented. + sequence_payload( + proposers, + acceptors, + &proposer_tick, + &acceptor_tick, + c_to_proposers, + r_to_acceptors_checkpoint, + p_ballot.clone(), + p_is_leader, + p_relevant_p1bs, + f, + a_max_ballot, + ) + }; + + a_log_complete_cycle.complete(unsafe { + // SAFETY: We will always write payloads to the log before acknowledging them to the proposers, + // which guarantees that if the leader changes the quorum overlap between sequencing and leader + // election will include the committed value. + a_log.latest_tick() + }); sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots); ( // Only tell the clients once when leader election concludes - just_became_leader.then(p_ballot).all_ticks(), + just_became_leader + .then(p_ballot) + .all_ticks() + .drop_timestamp(), p_to_replicas, ) } @@ -136,7 +173,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( +unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, proposer_tick: &Tick>, @@ -172,20 +209,27 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposer_id: ClusterId::from_raw(0) }))); - let (p_ballot, p_has_largest_ballot) = p_ballot_calc( - proposer_tick, - p_received_max_ballot.latest_tick(proposer_tick), - ); - - let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat( - proposers, - proposer_tick, - p_is_leader_forward_ref, - p_ballot.clone(), - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - ); + let (p_ballot, p_has_largest_ballot) = p_ballot_calc(proposer_tick, unsafe { + // SAFETY: A stale max ballot might result in us failing to become the leader, but which proposer + // becomes the leader is non-deterministic anyway. + p_received_max_ballot + .timestamped(proposer_tick) + .latest_tick() + }); + + let (p_to_proposers_i_am_leader, p_trigger_election) = unsafe { + // SAFETY: non-determinism in heartbeats may lead to additional leader election attempts, which + // is propagated to the non-determinism of which leader is elected. + p_leader_heartbeat( + proposers, + proposer_tick, + p_is_leader_forward_ref, + p_ballot.clone(), + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + ) + }; p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader); @@ -195,8 +239,17 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) .broadcast_bincode_interleaved(acceptors); - let (a_max_ballot, a_to_proposers_p1b) = - acceptor_p1(acceptor_tick, p_to_acceptors_p1a, a_log, proposers); + let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1( + acceptor_tick, + unsafe { + // SAFETY: Non-deterministic batching may result in different payloads being rejected + // by an acceptor if the payload is batched with another payload with larger ballot. + // But as documented, payloads may be non-deterministically dropped during leader election. + p_to_acceptors_p1a.timestamped(acceptor_tick).tick_batch() + }, + a_log, + proposers, + ); let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b( proposer_tick, @@ -206,7 +259,7 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( f, ); p_is_leader_complete_cycle.complete(p_is_leader.clone()); - p1b_fail_complete.complete(fail_ballots.all_ticks()); + p1b_fail_complete.complete(fail_ballots.drop_timestamp()); (p_ballot, p_is_leader, p_accepted_values, a_max_ballot) } @@ -257,35 +310,8 @@ fn p_ballot_calc<'a>( (p_ballot, p_has_largest_ballot) } -fn p_leader_expired<'a>( - proposer_tick: &Tick>, - p_to_proposers_i_am_leader: Stream, Unbounded, NoOrder>, - p_is_leader: Optional<(), Tick>, Bounded>, - i_am_leader_check_timeout: u64, // How often to check if heartbeat expired -) -> Optional, Tick>, Bounded> { - let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold_commutative( - q!(|| None), - q!(|latest, _| { - // Note: May want to check received ballot against our own? - *latest = Some(Instant::now()); - }), - ); - - p_latest_received_i_am_leader - .latest_tick(proposer_tick) - .continue_unless(p_is_leader) - .filter(q!(move |latest_received_i_am_leader| { - if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { - (Instant::now().duration_since(*latest_received_i_am_leader)) - > Duration::from_secs(i_am_leader_check_timeout) - } else { - true - } - })) -} - #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -fn p_leader_heartbeat<'a>( +unsafe fn p_leader_heartbeat<'a>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, p_is_leader: Optional<(), Tick>, Bounded>, @@ -295,49 +321,66 @@ fn p_leader_heartbeat<'a>( i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( Stream, Unbounded, NoOrder>, - Optional, Tick>, Bounded>, + Optional<(), Tick>, Bounded>, ) { - let p_to_proposers_i_am_leader = p_is_leader - .clone() - .then(p_ballot) - .latest() - .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) - .broadcast_bincode_interleaved(proposers); + let p_to_proposers_i_am_leader = unsafe { + // SAFETY: Delays in heartbeats may lead to leader election attempts even + // if the leader is alive. This will result in the previous leader receiving + // larger ballots from its peers and it will drop its leadership. + p_is_leader + .clone() + .then(p_ballot) + .latest() + .drop_timestamp() + .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) + } + .broadcast_bincode_interleaved(proposers); - let p_leader_expired = p_leader_expired( - proposer_tick, - p_to_proposers_i_am_leader.clone(), - p_is_leader, - i_am_leader_check_timeout, - ); + let p_leader_expired = unsafe { + // Delayed timeouts only affect which leader wins re-election. If the leadership flag + // is gained after timeout correctly ignore the timeout. If the flag is lost after + // timeout we correctly attempt to become the leader. + p_to_proposers_i_am_leader + .clone() + .timeout(q!(Duration::from_secs(i_am_leader_check_timeout))) + .timestamped(proposer_tick) + .latest_tick() + .continue_unless(p_is_leader) + }; // Add random delay depending on node ID so not everyone sends p1a at the same time - let p_trigger_election = p_leader_expired.continue_if( - proposers - .source_interval_delayed( - q!(Duration::from_secs( - (CLUSTER_SELF_ID.raw_id * i_am_leader_check_timeout_delay_multiplier as u32) - .into() - )), - q!(Duration::from_secs(i_am_leader_check_timeout)), - ) - .tick_batch(proposer_tick) - .first(), - ); + let p_trigger_election = unsafe { + // SAFETY: If the leader "un-expires" due to non-determinstic delay, we return + // to a stable leader state. If the leader remains expired, non-deterministic + // delay is propagated to the non-determinism of which leader is elected. + p_leader_expired.continue_if( + proposers + .source_interval_delayed( + q!(Duration::from_secs( + (CLUSTER_SELF_ID.raw_id + * i_am_leader_check_timeout_delay_multiplier as u32) + .into() + )), + q!(Duration::from_secs(i_am_leader_check_timeout)), + ) + .timestamped(proposer_tick) + .tick_batch() + .first(), + ) + }; (p_to_proposers_i_am_leader, p_trigger_election) } #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( acceptor_tick: &Tick>, - p_to_acceptors_p1a: Stream, Unbounded, NoOrder>, + p_to_acceptors_p1a: Stream>, Bounded, NoOrder>, a_log: Singleton>, Bounded>, proposers: &Cluster<'a, Proposer>, ) -> ( Singleton>, Bounded>, Stream<(Ballot, Result), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { - let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(acceptor_tick); let a_max_ballot = p_to_acceptors_p1a .clone() .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a))) @@ -385,32 +428,36 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ) -> ( Optional<(), Tick>, Bounded>, Stream>, Bounded, NoOrder>, - Stream>, Bounded, NoOrder>, + Stream>, Unbounded, NoOrder>, ) { - let (quorums, fails) = collect_quorum( - proposer_tick, - a_to_proposers_p1b.tick_batch(proposer_tick), + let (quorums, fails) = collect_quorum_with_response( + a_to_proposers_p1b.timestamped(proposer_tick), f + 1, 2 * f + 1, ); - let p_received_quorum_of_p1bs = quorums - .persist() - .fold_keyed_commutative( - q!(|| vec![]), - q!(|logs, log| { - logs.push(log); - }), - ) - .max_by_key(q!(|t| t.0)) - .zip(p_ballot.clone()) - .filter_map(q!( - move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot { - Some(quorum_accepted) - } else { - None - } - )); + let p_received_quorum_of_p1bs = unsafe { + // SAFETY: All the values for a quorum will be emitted in a single batch, + // so we will not split up the quorum. + quorums.tick_batch() + } + .persist() + .fold_keyed_commutative( + q!(|| vec![]), + q!(|logs, log| { + // even though this is non-commutative, we use `flatten_unordered` later + logs.push(log); + }), + ) + .max_by_key(q!(|t| t.0)) + .zip(p_ballot.clone()) + .filter_map(q!( + move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot { + Some(quorum_accepted) + } else { + None + } + )); let p_is_leader = p_received_quorum_of_p1bs .clone() @@ -419,8 +466,8 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ( p_is_leader, - // The fold was not commutative, so this is unordered. - p_received_quorum_of_p1bs.flatten().assume_ordering(), + // we used an unordered accumulator, so flattened has no order + p_received_quorum_of_p1bs.flatten_unordered(), fails.map(q!(|(_, ballot)| ballot)), ) } @@ -440,7 +487,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( Optional>, Bounded>, ) { let p_p1b_highest_entries_and_count = accepted_logs - .flatten() // Convert HashMap log back to stream + .flatten_unordered() // Convert HashMap log back to stream .fold_keyed_commutative::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { if let Some(curr_entry_payload) = &mut curr_entry.1 { let same_values = new_entry.value == curr_entry_payload.value; @@ -486,7 +533,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( .map(q!(|(slot, _)| slot)); let p_log_holes = p_max_slot .clone() - .flat_map(q!(|max_slot| 0..max_slot)) + .flat_map_ordered(q!(|max_slot| 0..max_slot)) .filter_not_in(p_proposed_slots) .cross_singleton(p_ballot.clone()) .map(q!(|(slot, ballot)| P2a { @@ -503,7 +550,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -fn sequence_payload<'a, P: PaxosPayload, R>( +unsafe fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, proposer_tick: &Tick>, @@ -530,18 +577,23 @@ fn sequence_payload<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>, Bounded>, ) -> ( Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, - Singleton>, Tick>, Bounded>, + Singleton>, Timestamped>, Unbounded>, Stream, Unbounded, NoOrder>, ) { let (p_log_to_recommit, p_max_slot) = recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f); - let indexed_payloads = index_payloads( - proposer_tick, - p_max_slot, - c_to_proposers, - p_is_leader.clone(), - ); + let indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe { + // SAFETY: We batch payloads so that we can compute the correct slot based on + // base slot. In the case of a leader re-election, the base slot is updated which + // affects the computed payload slots. This non-determinism can lead to non-determinism + // in which payloads are committed when the leader is changing, which is documented at + // the function level. + c_to_proposers + .timestamped(proposer_tick) + .tick_batch() + .continue_if(p_is_leader.clone()) + }); let payloads_to_send = indexed_payloads .cross_singleton(p_ballot.clone()) @@ -550,14 +602,14 @@ fn sequence_payload<'a, P: PaxosPayload, R>( Some(payload) ))) .union(p_log_to_recommit.map(q!(|p2a| ((p2a.slot, p2a.ballot), p2a.value)))) - .continue_if(p_is_leader); + .continue_if(p_is_leader) + .all_ticks(); let (a_log, a_to_proposers_p2b) = acceptor_p2( acceptor_tick, a_max_ballot.clone(), payloads_to_send .clone() - .all_ticks() .map(q!(|((slot, ballot), value)| P2a { ballot, slot, @@ -571,23 +623,23 @@ fn sequence_payload<'a, P: PaxosPayload, R>( // TOOD: only persist if we are the leader let (quorums, fails) = collect_quorum( - proposer_tick, - a_to_proposers_p2b.clone().tick_batch(proposer_tick), + a_to_proposers_p2b.timestamped(proposer_tick), f + 1, 2 * f + 1, ); - let p_to_replicas = join_responses( - proposer_tick, - quorums.keys().map(q!(|k| (k, ()))), - payloads_to_send, - ) - .all_ticks(); + let p_to_replicas = join_responses(proposer_tick, quorums.map(q!(|k| (k, ()))), unsafe { + // SAFETY: The metadata will always be generated before we get a quorum + // because `payloads_to_send` is used to send the payloads to acceptors. + payloads_to_send.tick_batch() + }); ( - p_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))), + p_to_replicas + .map(q!(|((slot, _ballot), (value, _))| (slot, value))) + .drop_timestamp(), a_log.map(q!(|(_ckpnt, log)| log)), - fails.map(q!(|(_, ballot)| ballot)).all_ticks(), + fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(), ) } @@ -601,8 +653,7 @@ enum CheckpointOrP2a

{ fn index_payloads<'a, P: PaxosPayload>( proposer_tick: &Tick>, p_max_slot: Optional>, Bounded>, - c_to_proposers: Stream, Unbounded>, - p_is_leader: Optional<(), Tick>, Bounded>, + c_to_proposers: Stream>, Bounded>, ) -> Stream<(usize, P), Tick>, Bounded> { let (p_next_slot_complete_cycle, p_next_slot) = proposer_tick.cycle_with_initial::>(proposer_tick.singleton(q!(0))); @@ -611,8 +662,6 @@ fn index_payloads<'a, P: PaxosPayload>( let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot); let p_indexed_payloads = c_to_proposers - .tick_batch(proposer_tick) - .continue_if(p_is_leader) .enumerate() .cross_singleton(base_slot.clone()) .map(q!(|((index, payload), base_slot)| ( @@ -645,19 +694,37 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, f: usize, ) -> ( - Singleton<(Option, HashMap>), Tick>, Bounded>, + Singleton< + (Option, HashMap>), + Timestamped>, + Unbounded, + >, Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { - let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(acceptor_tick); + let p_to_acceptors_p2a_batch = unsafe { + // SAFETY: we use batches to ensure that the log is updated before sending + // a confirmation to the proposer. Because we use `persist()` on these + // messages before folding into the log, non-deterministic batch boundaries + // will not affect the eventual log state. + p_to_acceptors_p2a.timestamped(acceptor_tick).tick_batch() + }; // Get the latest checkpoint sequence per replica - let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint - .tick_prefix(acceptor_tick) - .reduce_keyed_commutative(q!(|curr_seq, seq| { - if seq > *curr_seq { - *curr_seq = seq; - } - })); + let a_checkpoint_largest_seqs = unsafe { + // SAFETY: if a checkpoint is delayed, its effect is that the log may contain slots + // that do not need to be saved (because the data is at all replicas). This affects + // the logs that will be collected during a leader re-election, but eventually the + // same checkpoint will arrive at acceptors and those slots will be eventually deleted. + r_to_acceptors_checkpoint + .timestamped(acceptor_tick) + .tick_batch() + } + .persist() + .reduce_keyed_commutative(q!(|curr_seq, seq| { + if seq > *curr_seq { + *curr_seq = seq; + } + })); let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs.clone().count().filter_map(q!( move |num_received| if num_received == f + 1 { Some(true) @@ -686,7 +753,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( )); let a_log = a_p2as_to_place_in_log .union(a_new_checkpoint.into_stream()) - .persist() + .all_ticks() .fold_commutative( q!(|| (None, HashMap::new())), q!(|(prev_checkpoint, log), checkpoint_or_p2a| { diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index f8b8d9625d09..89ac8583b1ff 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -44,41 +44,70 @@ pub fn paxos_bench<'a>( ballot ))) .max() - .map(q!(|ballot: Ballot| ballot.proposer_id)) - .latest_tick(&client_tick); + .map(q!(|ballot: Ballot| ballot.proposer_id)); - let leader_changed = cur_leader_id.clone().delta().map(q!(|_| ())).all_ticks(); + let leader_changed = unsafe { + // SAFETY: we are okay if we miss a transient leader ID, because we + // will eventually get the latest one and can restart requests then + cur_leader_id + .clone() + .timestamped(&client_tick) + .latest_tick() + .delta() + .map(q!(|_| ())) + .all_ticks() + .drop_timestamp() + }; bench_client( &clients, leader_changed, |c_to_proposers| { - let (new_leader_elected, processed_payloads) = paxos_kv( - &proposers, - &acceptors, - &replicas, + let to_proposers = unsafe { + // SAFETY: the risk here is that we send a batch of requests + // with a stale leader ID, but because the leader ID comes from the + // network there is no way to guarantee that it is up to date + + // TODO(shadaj): we should retry if we get an error due to sending + // to a stale leader c_to_proposers - .tick_batch(&client_tick) - .cross_singleton(cur_leader_id) + .timestamped(&client_tick) + .tick_batch() + .cross_singleton(cur_leader_id.timestamped(&client_tick).latest_tick()) .all_ticks() - .map(q!(move |((key, value), leader_id)| (leader_id, KvPayload { - key, - // we use our ID as part of the value and use that so the replica only notifies us - value: ( - CLUSTER_SELF_ID, - value - ) - }))) - .send_bincode_interleaved(&proposers) - // clients "own" certain keys, so interleaving elements from clients will not affect - // the order of writes to the same key - .assume_ordering(), - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - checkpoint_frequency, - ); + } + .map(q!(move |((key, value), leader_id)| ( + leader_id, + KvPayload { + key, + // we use our ID as part of the value and use that so the replica only notifies us + value: (CLUSTER_SELF_ID, value) + } + ))) + .send_bincode_interleaved(&proposers); + + let to_proposers = unsafe { + // SAFETY: clients "own" certain keys, so interleaving elements from clients will not affect + // the order of writes to the same key + to_proposers.assume_ordering() + }; + + let (new_leader_elected, processed_payloads) = unsafe { + // SAFETY: Non-deterministic leader notifications are handled in `to_proposers`. We do not + // care about the order in which key writes are processed, which is the non-determinism in + // `processed_payloads`. + paxos_kv( + &proposers, + &acceptors, + &replicas, + to_proposers, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + checkpoint_frequency, + ) + }; new_leader_elected_complete .complete(new_leader_elected.broadcast_bincode_interleaved(&clients)); @@ -91,14 +120,13 @@ pub fn paxos_bench<'a>( .send_bincode_interleaved(&clients); // we only mark a transaction as committed when all replicas have applied it - let (c_quorum_payloads, _) = collect_quorum::<_, _, _, _, ()>( - &client_tick, - c_received_payloads.tick_batch(&client_tick), + let (c_quorum_payloads, _) = collect_quorum::<_, _, _, ()>( + c_received_payloads.timestamped(&client_tick), f + 1, f + 1, ); - c_quorum_payloads.keys().all_ticks() + c_quorum_payloads.drop_timestamp() }, num_clients_per_node, median_latency_window_size, @@ -120,9 +148,17 @@ fn bench_client<'a>( // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Whenever the leader changes, make all clients send a message - let restart_this_tick = trigger_restart.tick_batch(&client_tick).last(); + let restart_this_tick = unsafe { + // SAFETY: non-deterministic delay in restarting requests + // is okay because once it is restarted statistics should reach + // steady state regardless of when the restart happes + trigger_restart + .timestamped(&client_tick) + .tick_batch() + .last() + }; - let c_new_payloads_when_restart = restart_this_tick.clone().flat_map(q!(move |_| (0 + let c_new_payloads_when_restart = restart_this_tick.clone().flat_map_ordered(q!(move |_| (0 ..num_clients_per_node) .map(move |i| ( (CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32, @@ -131,19 +167,30 @@ fn bench_client<'a>( let (c_to_proposers_complete_cycle, c_to_proposers) = clients.forward_ref::>(); - let c_received_quorum_payloads = transaction_cycle(c_to_proposers).tick_batch(&client_tick); + let c_received_quorum_payloads = unsafe { + // SAFETY: because the transaction processor is required to handle arbitrary reordering + // across *different* keys, we are safe because delaying a transaction result for a key + // will only affect when the next request for that key is emitted with respect to other + // keys + transaction_cycle(c_to_proposers) + .timestamped(&client_tick) + .tick_batch() + }; // Whenever all replicas confirm that a payload was committed, send another payload let c_new_payloads_when_committed = c_received_quorum_payloads .clone() .map(q!(|payload| (payload.0, payload.1 + 1))); c_to_proposers_complete_cycle.complete( - // we don't send a new write for the same key until the previous one is committed, - // so writes to the same key are ordered c_new_payloads_when_restart - .union(c_new_payloads_when_committed) + .chain(unsafe { + // SAFETY: we don't send a new write for the same key until the previous one is committed, + // so this contains only a single write per key, and we don't care about order + // across keys + c_new_payloads_when_committed.assume_ordering() + }) .all_ticks() - .assume_ordering(), + .drop_timestamp(), ); // Track statistics @@ -151,7 +198,7 @@ fn bench_client<'a>( client_tick.cycle::>(); let c_new_timers_when_leader_elected = restart_this_tick .map(q!(|_| SystemTime::now())) - .flat_map(q!( + .flat_map_ordered(q!( move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now)) )); let c_updated_timers = c_received_quorum_payloads @@ -168,10 +215,14 @@ fn bench_client<'a>( })); c_timers_complete_cycle.complete_next_tick(c_new_timers); - let c_stats_output_timer = clients - .source_interval(q!(Duration::from_secs(1))) - .tick_batch(&client_tick) - .first(); + let c_stats_output_timer = unsafe { + // SAFETY: intentionally sampling statistics + clients + .source_interval(q!(Duration::from_secs(1))) + .timestamped(&client_tick) + .tick_batch() + } + .first(); let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick(); @@ -182,7 +233,7 @@ fn bench_client<'a>( ))) .union(c_latency_reset.into_stream()) .all_ticks() - .flatten() + .flatten_ordered() .fold_commutative( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size @@ -230,21 +281,22 @@ fn bench_client<'a>( }), ); - c_latencies - .zip(c_throughput) - .latest_tick(&client_tick) - .continue_if(c_stats_output_timer) - .all_ticks() - .for_each(q!(move |(latencies, throughput)| { - let mut latencies_mut = latencies.borrow_mut(); - if latencies_mut.len() > 0 { - let middle_idx = latencies_mut.len() / 2; - let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); - println!("Median latency: {}ms", (*median) as f64 / 1000.0); - } + unsafe { + // SAFETY: intentionally sampling statistics + c_latencies.zip(c_throughput).latest_tick() + } + .continue_if(c_stats_output_timer) + .all_ticks() + .for_each(q!(move |(latencies, throughput)| { + let mut latencies_mut = latencies.borrow_mut(); + if latencies_mut.len() > 0 { + let middle_idx = latencies_mut.len() / 2; + let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); + println!("Median latency: {}ms", (*median) as f64 / 1000.0); + } - println!("Throughput: {} requests/s", throughput); - })); + println!("Throughput: {} requests/s", throughput); + })); // End track statistics } diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 83d1ff0bfd88..3683abc2d061 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -42,12 +42,17 @@ impl PartialOrd for SequencedKv { } } +/// Sets up a linearizable key-value store using Paxos. +/// +/// # Safety +/// Notifications for leader election are non-deterministic. When the leader is changing, +/// writes may be dropped by the old leader. #[expect( clippy::type_complexity, clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -pub fn paxos_kv<'a, K: KvKey, V: KvValue>( +pub unsafe fn paxos_kv<'a, K: KvKey, V: KvValue>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, replicas: &Cluster<'a, Replica>, @@ -64,16 +69,19 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = replicas.forward_ref::>(); - let (p_to_clients_new_leader_elected, p_to_replicas) = paxos_core( - proposers, - acceptors, - r_to_acceptors_checkpoint.broadcast_bincode(acceptors), - c_to_proposers, - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - ); + let (p_to_clients_new_leader_elected, p_to_replicas) = unsafe { + // SAFETY: Leader election non-determinism and non-deterministic dropping of writes is documented. + paxos_core( + proposers, + acceptors, + r_to_acceptors_checkpoint.broadcast_bincode(acceptors), + c_to_proposers, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + ) + }; let (r_to_acceptors_checkpoint_new, r_new_processed_payloads) = replica( replicas, @@ -102,8 +110,13 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle(); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); - let r_sorted_payloads = p_to_replicas - .tick_batch(&replica_tick) + let r_sorted_payloads = unsafe { + // SAFETY: because we fill slots one-by-one, we can safely batch + // because non-determinism is resolved when we sort by slots + p_to_replicas + .timestamped(&replica_tick) + .tick_batch() + } .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it @@ -183,5 +196,8 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let r_to_clients = r_processable_payloads .filter_map(q!(|payload| payload.kv)) .all_ticks(); - (r_checkpoint_seq_new.all_ticks(), r_to_clients) + ( + r_checkpoint_seq_new.all_ticks().drop_timestamp(), + r_to_clients.drop_timestamp(), + ) } diff --git a/hydroflow_plus_test/src/cluster/quorum.rs b/hydroflow_plus_test/src/cluster/quorum.rs index 7990cba7c233..b0fa2c8c3695 100644 --- a/hydroflow_plus_test/src/cluster/quorum.rs +++ b/hydroflow_plus_test/src/cluster/quorum.rs @@ -1,10 +1,11 @@ use std::hash::Hash; use hydroflow_plus::*; +use location::tick::Timestamped; use location::NoTick; #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -pub fn collect_quorum< +pub fn collect_quorum_with_response< 'a, L: Location<'a> + NoTick, Order, @@ -12,17 +13,21 @@ pub fn collect_quorum< V: Clone, E: Clone, >( - tick: &Tick, - responses: Stream<(K, Result), Tick, Bounded, Order>, + responses: Stream<(K, Result), Timestamped, Unbounded, Order>, min: usize, max: usize, ) -> ( - Stream<(K, V), Tick, Bounded, Order>, - Stream<(K, E), Tick, Bounded, Order>, + Stream<(K, V), Timestamped, Unbounded, Order>, + Stream<(K, E), Timestamped, Unbounded, Order>, ) { + let tick = responses.timestamp_source(); let (not_all_complete_cycle, not_all) = tick.cycle::>(); - let current_responses = not_all.union(responses.clone()); + let current_responses = not_all.union(unsafe { + // SAFETY: we always persist values that have not reached quorum, so even + // with arbitrary batching we always produce deterministic quorum results + responses.clone().tick_batch() + }); let count_per_key = current_responses.clone().fold_keyed_commutative( q!(move || (0, 0)), @@ -73,11 +78,8 @@ pub fn collect_quorum< min_but_not_max_complete_cycle .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone())); - not_all_complete_cycle.complete_next_tick( - current_responses - .clone() - .anti_join(received_from_all.clone()), - ); + not_all_complete_cycle + .complete_next_tick(current_responses.clone().anti_join(received_from_all)); current_responses .anti_join(not_reached_min_count) @@ -85,10 +87,90 @@ pub fn collect_quorum< }; ( - just_reached_quorum.filter_map(q!(move |(key, res)| match res { - Ok(v) => Some((key, v)), - Err(_) => None, + just_reached_quorum + .filter_map(q!(move |(key, res)| match res { + Ok(v) => Some((key, v)), + Err(_) => None, + })) + .all_ticks(), + responses.filter_map(q!(move |(key, res)| match res { + Ok(_) => None, + Err(e) => Some((key, e)), })), + ) +} + +#[expect(clippy::type_complexity, reason = "quorum types are complex")] +pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>( + responses: Stream<(K, Result<(), E>), Timestamped, Unbounded, Order>, + min: usize, + max: usize, +) -> ( + Stream, Unbounded, Order>, + Stream<(K, E), Timestamped, Unbounded, Order>, +) { + let tick = responses.timestamp_source(); + let (not_all_complete_cycle, not_all) = tick.cycle::>(); + + let current_responses = not_all.union(unsafe { + // SAFETY: we always persist values that have not reached quorum, so even + // with arbitrary batching we always produce deterministic quorum results + responses.clone().tick_batch() + }); + + let count_per_key = current_responses.clone().fold_keyed_commutative( + q!(move || (0, 0)), + q!(move |accum, value| { + if value.is_ok() { + accum.0 += 1; + } else { + accum.1 += 1; + } + }), + ); + + let reached_min_count = + count_per_key + .clone() + .filter_map(q!(move |(key, (success, _error))| if success >= min { + Some(key) + } else { + None + })); + + let just_reached_quorum = if max == min { + not_all_complete_cycle.complete_next_tick( + current_responses + .clone() + .anti_join(reached_min_count.clone()), + ); + + reached_min_count + } else { + let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle(); + + let received_from_all = + count_per_key.filter_map(q!( + move |(key, (success, error))| if (success + error) >= max { + Some(key) + } else { + None + } + )); + + min_but_not_max_complete_cycle.complete_next_tick( + reached_min_count + .clone() + .filter_not_in(received_from_all.clone()), + ); + + not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all)); + + reached_min_count.filter_not_in(min_but_not_max) + }; + + ( + just_reached_quorum.all_ticks(), responses.filter_map(q!(move |(key, res)| match res { Ok(_) => None, Err(e) => Some((key, e)), diff --git a/hydroflow_plus_test/src/cluster/request_response.rs b/hydroflow_plus_test/src/cluster/request_response.rs index 4c8e71af1207..945baa0e456c 100644 --- a/hydroflow_plus_test/src/cluster/request_response.rs +++ b/hydroflow_plus_test/src/cluster/request_response.rs @@ -1,18 +1,21 @@ use std::hash::Hash; use hydroflow_plus::*; +use location::tick::Timestamped; use location::NoTick; use stream::NoOrder; -type JoinResponses = Stream<(K, (M, V)), Tick, Bounded, NoOrder>; +type JoinResponses = Stream<(K, (M, V)), Timestamped, Unbounded, NoOrder>; /// Given an incoming stream of request-response responses, joins with metadata generated /// at request time that is stored in-memory. /// -/// Only one response element should be produced with a given key, same for the metadata stream. +/// The metadata must be generated in the same or a previous tick than the response, +/// typically at request time. Only one response element should be produced with a given +/// key, same for the metadata stream. pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>( tick: &Tick, - responses: Stream<(K, V), Tick, Bounded, NoOrder>, + responses: Stream<(K, V), Timestamped, Unbounded, NoOrder>, metadata: Stream<(K, M), Tick, Bounded, NoOrder>, ) -> JoinResponses { let (remaining_to_join_complete_cycle, remaining_to_join) = @@ -20,6 +23,12 @@ pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location< let remaining_and_new: Stream<(K, M), Tick, Bounded, _> = remaining_to_join.union(metadata); + let responses = unsafe { + // SAFETY: because we persist the metadata, delays resulting from + // batching boundaries do not affect the output contents. + responses.tick_batch() + }; + // TODO(shadaj): we should have a "split-join" operator // that returns both join and anti-join without cloning let joined_this_tick = @@ -31,5 +40,5 @@ pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location< remaining_to_join_complete_cycle .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key)))); - joined_this_tick + joined_this_tick.all_ticks() } diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 0a65ae3e81af..64b7e5045f9b 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -35,12 +35,10 @@ pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<' ids.cross_product(numbers) .map(q!(|(id, n)| (id, (id, n)))) .send_bincode(&cluster) - .tick_batch(&cluster.tick()) .inspect(q!(move |n| println!( "cluster received: {:?} (self cluster id: {})", n, CLUSTER_SELF_ID ))) - .all_ticks() .send_bincode(&process) .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d))); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 392e8bfc87fc..b960be7d85a7 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -321,39 +321,39 @@ expression: built.ir() inner: , }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( - Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout__free = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout__free) } else { true } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), + input: CrossSingleton( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , core :: option :: Option < () > > ({ use hydroflow_plus :: __staged :: stream :: * ; let duration__free = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout__free = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout__free) } ; move | latest_received | { if let Some (latest_received) = latest_received { if Instant :: now () . duration_since (latest_received) > duration__free { Some (()) } else { None } } else { Some (()) } } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: stream :: * ; | | None }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), input: Persist( Tee { inner: , }, ), }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , }, }, }, - ), - }, + }, + ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), @@ -740,27 +740,27 @@ expression: built.ir() ), input: DeferTick( Difference( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), - input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), - input: Tee { - inner: : Chain( - CycleSource { - ident: Ident { - sym: cycle_8, - }, - location_kind: Tick( - 2, - Cluster( - 0, + Tee { + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + input: Tee { + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + input: Tee { + inner: : Chain( + CycleSource { + ident: Ident { + sym: cycle_8, + }, + location_kind: Tick( + 2, + Cluster( + 0, + ), ), - ), - }, - Tee { - inner: : Tee { + }, + Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { @@ -910,8 +910,8 @@ expression: built.ir() }, }, }, - }, - ), + ), + }, }, }, }, @@ -920,7 +920,7 @@ expression: built.ir() inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -940,7 +940,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: , + inner: , }, Tee { inner: , @@ -983,40 +983,22 @@ expression: built.ir() input: Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (k , _) | k }), - input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydroflow_plus :: __staged :: stream :: * ; | | () }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _ , _ | { } }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), - input: AntiJoin( - AntiJoin( - Tee { - inner: , - }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), - input: Tee { - inner: , - }, - }, - ), - CycleSource { - ident: Ident { - sym: cycle_9, - }, - location_kind: Tick( - 2, - Cluster( - 0, - ), - ), - }, - ), + input: Difference( + Tee { + inner: , + }, + CycleSource { + ident: Ident { + sym: cycle_9, }, + location_kind: Tick( + 2, + Cluster( + 0, + ), + ), }, - }, + ), }, }, }, @@ -1146,7 +1128,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1156,7 +1138,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1229,7 +1211,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1256,7 +1238,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1288,7 +1270,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1328,7 +1310,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1349,7 +1331,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1539,14 +1521,16 @@ expression: built.ir() }, ), }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), - input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), - input: Tee { - inner: , + Tee { + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + input: Tee { + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + input: Tee { + inner: , + }, }, }, }, @@ -1565,7 +1549,7 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let CLUSTER_SELF_ID__free = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node__free = 1usize ; move | _ | (0 .. num_clients_per_node__free) . map (move | i | ((CLUSTER_SELF_ID__free . raw_id * (num_clients_per_node__free as u32)) + i as u32 , 0)) }), input: Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | * curr = new }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | () }), @@ -1581,26 +1565,8 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (u32 , u32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . 0 , payload . 1 + 1) }), input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , ()) , (u32 , u32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (k , _) | k }), - input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydroflow_plus :: __staged :: stream :: * ; | | () }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _ , _ | { } }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , core :: result :: Result < () , () >) , core :: option :: Option < ((u32 , u32) , ()) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), - input: AntiJoin( - Tee { - inner: , - }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), - input: Tee { - inner: , - }, - }, - ), - }, - }, + inner: : Tee { + inner: , }, }, }, @@ -1622,7 +1588,7 @@ expression: built.ir() input: Chain( Chain( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, @@ -1639,16 +1605,16 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1675,10 +1641,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1686,7 +1652,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval__free = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) }, ), @@ -1716,7 +1682,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (u32 , u32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1727,7 +1693,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1739,7 +1705,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1750,7 +1716,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index dd89438e0b08..4a5f8220cb60 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -1,5 +1,7 @@ use hydroflow_plus::*; +use super::quorum::collect_quorum; + // if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. // @@ -32,7 +34,7 @@ pub fn two_pc<'a>( let c_receive_client_transactions = client_transaction.send_bincode(&coordinator); c_receive_client_transactions .clone() - .inspect(q!(|t| println!( + .for_each(q!(|t| println!( "receive transaction {}, ready to broadcast", t ))); @@ -40,47 +42,46 @@ pub fn two_pc<'a>( // broadcast prepare message to participants. let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants); - // assume all participants reply commit - let p_ready_to_commit = p_receive_prepare.map(q!(|t| (t, String::from("commit")))); + // participant 1 aborts transaction 1 + let p_ready_to_commit = p_receive_prepare.map(q!(move |t| ( + t, + if t == 1 && CLUSTER_SELF_ID.raw_id == 1 { + "abort".to_string() + } else { + "commit".to_string() + } + ))); let c_received_reply = p_ready_to_commit.send_bincode(&coordinator); // c_received_reply.clone().inspect(q!(|(id, (t, reply))| println!("participant {id} said {reply} for transaction {t}"))); // collect votes from participant. - // aborted transactions. - let c_participant_voted_abort = c_received_reply - .clone() - .filter(q!(|(_id, (_t, reply))| reply == "abort")) - .map(q!(|(id, (t, _reply))| (t, id))); - let p_receive_abort = c_participant_voted_abort.broadcast_bincode(&participants); - p_receive_abort.clone().inspect(q!(|(t, id)| println!( - "{} vote abort for transaction {}", - id, t - ))); + let coordinator_tick = coordinator.tick(); + let (c_all_commit, c_participant_voted_abort) = collect_quorum( + c_received_reply + .map(q!(|(id, (t, reply))| ( + t, + if reply == "commit" { Ok(()) } else { Err(id) } + ))) + .timestamped(&coordinator_tick), + num_participants as usize, + num_participants as usize, + ); + + let p_receive_abort = c_participant_voted_abort + // TODO(shadaj): if multiple participants vote abort we should deduplicate + .inspect(q!(|(t, id)| println!( + "{} vote abort for transaction {}", + id, t + ))) + .broadcast_bincode(&participants); let c_receive_ack = p_receive_abort.send_bincode(&coordinator); c_receive_ack.for_each(q!(|(id, (t, _))| println!( "Coordinator receive participant {} abort for transaction {}", id, t ))); - // committed transactions - let c_participant_voted_commit = c_received_reply - .filter(q!(|(_id, (_t, reply))| reply == "commit")) - .map(q!(|(id, (t, _reply))| (t, id))) - // fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2). - // The output will have one tuple for each distinct K, with an accumulated value of type V2. - .tick_batch(&coordinator.tick()).fold_keyed_commutative(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { - // here I set the participant to 3. If want more or less participant, fix line 26 of examples/broadcast.rs - if count == num_participants { - Some(t) - } else { - None - } - })); - // broadcast commit transactions to participants. - let p_receive_commit = c_participant_voted_commit - .all_ticks() - .broadcast_bincode(&participants); + let p_receive_commit = c_all_commit.broadcast_bincode(&participants); // p_receive_commit.clone().for_each(q!(|t| println!("commit for transaction {}", t))); let c_receive_ack = p_receive_commit.send_bincode(&coordinator); diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index 566b5effbe03..26c53a528ae6 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -14,15 +14,27 @@ pub fn chat_app<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let users = process - .source_stream(users_stream) - .tick_batch(&tick) - .persist(); + let users = unsafe { + // SAFETY: intentionally non-deterministic to not send messaged + // to users that joined after the message was sent + process + .source_stream(users_stream) + .timestamped(&tick) + .tick_batch() + } + .persist(); let messages = process.source_stream(messages); let messages = if replay_messages { - messages.tick_batch(&tick).persist() + unsafe { + // SAFETY: see above + messages.timestamped(&tick).tick_batch() + } + .persist() } else { - messages.tick_batch(&tick) + unsafe { + // SAFETY: see above + messages.timestamped(&tick).tick_batch() + } }; // do this after the persist to test pullup diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index 131c529111bf..b790f8d931f5 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -21,21 +21,25 @@ pub fn compute_pi<'a>(flow: &FlowBuilder<'a>, batch_size: RuntimeData) -> *total += 1; }), ) - .all_ticks(); - - trials - .reduce(q!(|(inside, total), (inside_batch, total_batch)| { - *inside += inside_batch; - *total += total_batch; - })) - .sample_every(q!(Duration::from_secs(1))) - .for_each(q!(|(inside, total)| { - println!( - "pi: {} ({} trials)", - 4.0 * inside as f64 / total as f64, - total - ); - })); + .all_ticks() + .drop_timestamp(); + + let estimate = trials.reduce(q!(|(inside, total), (inside_batch, total_batch)| { + *inside += inside_batch; + *total += total_batch; + })); + + unsafe { + // SAFETY: intentional non-determinism + estimate.sample_every(q!(Duration::from_secs(1))) + } + .for_each(q!(|(inside, total)| { + println!( + "pi: {} ({} trials)", + 4.0 * inside as f64 / total as f64, + total + ); + })); process } diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index ee670a6bfd02..528bc24db2a0 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -12,11 +12,12 @@ pub fn count_elems_generic<'a, T: 'a>( let tick = process.tick(); let source = process.source_stream(input_stream); - let count = source - .map(q!(|_| 1)) - .tick_batch(&tick) - .fold(q!(|| 0), q!(|a, b| *a += b)) - .all_ticks(); + let count = unsafe { + // SAFETY: intentionally using ticks + source.map(q!(|_| 1)).timestamped(&tick).tick_batch() + } + .fold(q!(|| 0), q!(|a, b| *a += b)) + .all_ticks(); count.for_each(q!(|v| { output.send(v).unwrap(); diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 0d50a88f51ae..84a6b4b0799b 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -19,11 +19,20 @@ pub fn graph_reachability<'a>( let reachability_tick = process.tick(); let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::>(); - let reached = roots.tick_batch(&reachability_tick).union(reached_cycle); + let reached = unsafe { + // SAFETY: roots can be inserted on any tick because we are fixpointing + roots + .timestamped(&reachability_tick) + .tick_batch() + .union(reached_cycle) + }; let reachable = reached .clone() .map(q!(|r| (r, ()))) - .join(edges.tick_batch(&reachability_tick).persist()) + .join(unsafe { + // SAFETY: edges can be inserted on any tick because we are fixpointing + edges.timestamped(&reachability_tick).tick_batch().persist() + }) .map(q!(|(_from, (_, to))| to)); set_reached_cycle.complete_next_tick(reached.clone().union(reachable)); diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index a0924f70a93b..2999cfdf8cae 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -12,12 +12,24 @@ pub fn test_difference<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let mut source = process.source_iter(q!(0..5)).tick_batch(&tick); + let mut source = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(0..5)) + .timestamped(&tick) + .tick_batch() + }; if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); + let mut source2 = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(3..6)) + .timestamped(&tick) + .tick_batch() + }; if persist2 { source2 = source2.persist(); } @@ -39,15 +51,25 @@ pub fn test_anti_join<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let mut source = process - .source_iter(q!(0..5)) - .map(q!(|v| (v, v))) - .tick_batch(&tick); + let mut source = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(0..5)) + .map(q!(|v| (v, v))) + .timestamped(&tick) + .tick_batch() + }; if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); + let mut source2 = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(3..6)) + .timestamped(&tick) + .tick_batch() + }; if persist2 { source2 = source2.persist(); } diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 36e575f4109e..96ab6789243a 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -19,7 +19,13 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( let node_one = flow.process::(); let n0_tick = node_zero.tick(); - let source = node_zero.source_stream(input_stream).tick_batch(&n0_tick); + let source = unsafe { + // SAFETY: intentionally using ticks + node_zero + .source_stream(input_stream) + .timestamped(&n0_tick) + .tick_batch() + }; let map1 = source.clone().map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ()))); diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs index d38026335275..ee1cdabaad1a 100644 --- a/stageleft/src/lib.rs +++ b/stageleft/src/lib.rs @@ -469,7 +469,6 @@ impl<'a, T: 'a, Ctx> QuotedWithContext<'a, T, Ctx> for RuntimeData {} impl Copy for RuntimeData {} -// TODO(shadaj): relax this to allow for non-copy types impl Clone for RuntimeData { fn clone(&self) -> Self { *self diff --git a/template/hydroflow_plus/Cargo.toml b/template/hydroflow_plus/Cargo.toml index 2c521b6155d6..1544bce2112e 100644 --- a/template/hydroflow_plus/Cargo.toml +++ b/template/hydroflow_plus/Cargo.toml @@ -23,3 +23,6 @@ hydroflow_plus = { git = "{{ hydroflow_git | default: 'https://github.com/hydro- "deploy", ] } tokio-stream = { version = "0.1.3", default-features = false } + +[lints.rust] +unsafe_op_in_unsafe_fn = "warn"