From 487e8fa7ad44e712e0e8fc824496388312fc1f0c Mon Sep 17 00:00:00 2001 From: David Chu Date: Thu, 9 Jan 2025 17:29:40 +0800 Subject: [PATCH] fix(paxos): Proposers don't re-commit entries before the latest checkpoint (#1615) Paxos proposers assign no-ops to holes in the log after receiving a quorum of p1bs. Acceptors delete log entries before the latest checkpoint, which means they will show up as holes in the proposer. The proposer should not propose any operations before the most recent checkpoint in a quorum of p1bs. --- hydro_test/src/cluster/paxos.rs | 60 ++-- ...cluster__paxos_bench__tests__paxos_ir.snap | 312 ++++++++++-------- 2 files changed, 206 insertions(+), 166 deletions(-) diff --git a/hydro_test/src/cluster/paxos.rs b/hydro_test/src/cluster/paxos.rs index 0aeee3ee45b..93f4f0bb06f 100644 --- a/hydro_test/src/cluster/paxos.rs +++ b/hydro_test/src/cluster/paxos.rs @@ -180,11 +180,11 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, p_received_p2b_ballots: Stream, Unbounded, NoOrder>, - a_log: Singleton>, Bounded>, + a_log: Singleton<(Option, L), Tick>, Bounded>, ) -> ( Singleton>, Bounded>, Optional<(), Tick>, Bounded>, - Stream>, Bounded, NoOrder>, + Stream<(Option, L), Tick>, Bounded, NoOrder>, Singleton>, Bounded>, ) { let (p1b_fail_complete, p1b_fail) = @@ -372,11 +372,11 @@ unsafe fn p_leader_heartbeat<'a>( fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( acceptor_tick: &Tick>, p_to_acceptors_p1a: Stream>, Bounded, NoOrder>, - a_log: Singleton>, Bounded>, + a_log: Singleton<(Option, L), Tick>, Bounded>, proposers: &Cluster<'a, Proposer>, ) -> ( Singleton>, Bounded>, - Stream<(Ballot, Result), Cluster<'a, Proposer>, Unbounded, NoOrder>, + Stream<(Ballot, Result<(Option, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { let a_max_ballot = p_to_acceptors_p1a .clone() @@ -414,7 +414,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( proposer_tick: &Tick>, a_to_proposers_p1b: Stream< - (Ballot, Result), + (Ballot, Result<(Option, P), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder, @@ -424,7 +424,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( f: usize, ) -> ( Optional<(), Tick>, Bounded>, - Stream>, Bounded, NoOrder>, + Stream<(Option, P), Tick>, Bounded, NoOrder>, Stream>, Unbounded, NoOrder>, ) { let (quorums, fails) = collect_quorum_with_response( @@ -472,7 +472,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn recommit_after_leader_election<'a, P: PaxosPayload>( accepted_logs: Stream< - HashMap>, + (Option, HashMap>), Tick>, Bounded, NoOrder, @@ -483,7 +483,12 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( Stream, Tick>, Bounded, NoOrder>, Optional>, Bounded>, ) { + let p_p1b_max_checkpoint = accepted_logs + .clone() + .map(q!(|(checkpoint, _log)| checkpoint)) + .max(); let p_p1b_highest_entries_and_count = accepted_logs + .map(q!(|(_checkpoint, log)| log)) .flatten_unordered() // Convert HashMap log back to stream .fold_keyed_commutative::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { if let Some(curr_entry_payload) = &mut curr_entry.1 { @@ -510,16 +515,20 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( let p_log_to_try_commit = p_p1b_highest_entries_and_count .clone() .cross_singleton(p_ballot.clone()) - .filter_map(q!(move |((slot, (count, entry)), ballot)| { - if count <= f { - Some(P2a { - ballot, - slot, - value: entry.value, - }) - } else { - None + .cross_singleton(p_p1b_max_checkpoint.clone()) + .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| { + if count > f { + return None; + } else if let Some(checkpoint) = checkpoint { + if slot <= checkpoint { + return None; + } } + Some(P2a { + ballot, + slot, + value: entry.value, + }) })); let p_max_slot = p_p1b_highest_entries_and_count .clone() @@ -530,7 +539,14 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( .map(q!(|(slot, _)| slot)); let p_log_holes = p_max_slot .clone() - .flat_map_ordered(q!(|max_slot| 0..max_slot)) + .zip(p_p1b_max_checkpoint) + .flat_map_ordered(q!(|(max_slot, checkpoint)| { + if let Some(checkpoint) = checkpoint { + (checkpoint + 1)..max_slot + } else { + 0..max_slot + } + })) .filter_not_in(p_proposed_slots) .cross_singleton(p_ballot.clone()) .map(q!(|(slot, ballot)| P2a { @@ -564,7 +580,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( p_is_leader: Optional<(), Tick>, Bounded>, p_relevant_p1bs: Stream< - HashMap>, + (Option, HashMap>), Tick>, Bounded, NoOrder, @@ -574,7 +590,11 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>, Bounded>, ) -> ( Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, - Singleton>, Timestamped>, Unbounded>, + Singleton< + (Option, HashMap>), + Timestamped>, + Unbounded, + >, Stream, Unbounded, NoOrder>, ) { let (p_log_to_recommit, p_max_slot) = @@ -635,7 +655,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( p_to_replicas .map(q!(|((slot, _ballot), (value, _))| (slot, value))) .drop_timestamp(), - a_log.map(q!(|(_ckpnt, log)| log)), + a_log, fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(), ) } diff --git a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap index ae6f6068a1f..d53ab94f81b 100644 --- a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -210,7 +210,7 @@ expression: built.ir() input: Tee { inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydro_std :: __staged :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot > , () > ({ use hydro_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot > , () > ({ use hydro_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -226,9 +226,9 @@ expression: built.ir() }, Tee { inner: : Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >)) , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >)) , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 1, @@ -239,14 +239,14 @@ expression: built.ir() ), to_key: None, serialize_fn: Some( - | (id , data) : (hydro_lang :: ClusterId < _ > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydro_lang :: runtime_support :: bincode :: serialize :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, + | (id , data) : (hydro_lang :: ClusterId < _ > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydro_lang :: runtime_support :: bincode :: serialize :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , deserialize_fn: Some( - | res | { let (id , b) = res . unwrap () ; (hydro_lang :: ClusterId :: < hydro_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydro_lang :: runtime_support :: bincode :: deserialize :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, + | res | { let (id , b) = res . unwrap () ; (hydro_lang :: ClusterId :: < hydro_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydro_lang :: runtime_support :: bincode :: deserialize :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((ballot , max_ballot) , log) | (ballot . proposer_id , (ballot , if ballot == max_ballot { Ok (log) } else { Err (max_ballot) })) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((ballot , max_ballot) , log) | (ballot . proposer_id , (ballot , if ballot == max_ballot { Ok (log) } else { Err (max_ballot) })) }), input: CrossSingleton( CrossSingleton( Tee { @@ -429,19 +429,19 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydro_lang :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | () }), input: Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > >) , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; move | ((quorum_ballot , quorum_accepted) , my_ballot) | if quorum_ballot == my_ballot { Some (quorum_accepted) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) >) , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; move | ((quorum_ballot , quorum_accepted) , my_ballot) | if quorum_ballot == my_ballot { Some (quorum_accepted) } else { None } }), input: CrossSingleton( Reduce { - f: { let key_fn = stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > >) , hydro_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }) ; move | curr , new | { if key_fn (& new) > key_fn (& * curr) { * curr = new ; } } }, + f: { let key_fn = stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) >) , hydro_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }) ; move | curr , new | { if key_fn (& new) > key_fn (& * curr) { * curr = new ; } } }, input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | vec ! [] }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | vec ! [] }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), input: Persist( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydro_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydro_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydro_test :: cluster :: paxos :: Ballot , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >)) > > ({ use hydro_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), input: AntiJoin( AntiJoin( Tee { @@ -510,7 +510,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot) , hydro_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydro_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot) > > ({ use hydro_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot) > > ({ use hydro_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -624,10 +624,15 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use hydro_lang :: __staged :: stream :: * ; | d | d }), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use hydro_lang :: __staged :: optional :: * ; | v | v }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_checkpoint , log) | log }), input: Tee { - inner: , + inner: : FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydro_lang :: __staged :: optional :: * ; | v | v }), + input: Tee { + inner: , + }, + }, }, }, }, @@ -688,14 +693,14 @@ expression: built.ir() input: DeferTick( Difference( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydro_test :: cluster :: paxos :: Ballot) > > ({ use hydro_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydro_std :: __staged :: quorum :: * ; move | | (0 , 0) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydro_test :: cluster :: paxos :: Ballot > , () > ({ use hydro_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_8, @@ -708,7 +713,7 @@ expression: built.ir() ), }, Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > , ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydro_test :: cluster :: paxos :: Ballot >)) , ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydro_test :: cluster :: paxos :: Ballot >) > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -730,7 +735,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , hydro_test :: cluster :: paxos :: Ballot) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydro_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , ((p2a . slot , p2a . ballot) , if p2a . ballot == max_ballot { Ok (()) } else { Err (max_ballot) })) }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >) , hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -753,7 +758,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >) , hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((slot , ballot) , value) | P2a { ballot , slot , value } }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >) , ()) , ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use hydro_lang :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Chain( @@ -772,13 +777,26 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2a | ((p2a . slot , p2a . ballot) , p2a . value) }), input: Chain( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >)) , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | ((slot , (count , entry)) , ballot) | { if count <= f__free { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , (usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >)) , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < usize >) , core :: option :: Option < hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | (((slot , (count , entry)) , ballot) , checkpoint) | { if count > f__free { return None ; } else if let Some (checkpoint) = checkpoint { if slot <= checkpoint { return None ; } } Some (P2a { ballot , slot , value : entry . value , }) } }), input: CrossSingleton( + CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), Tee { - inner: , - }, - Tee { - inner: , + inner: : Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | checkpoint }), + input: Tee { + inner: , + }, + }, + }, }, ), }, @@ -787,10 +805,15 @@ expression: built.ir() input: CrossSingleton( Difference( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), - input: Tee { - inner: , - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , core :: option :: Option < usize >) , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (max_slot , checkpoint) | { if let Some (checkpoint) = checkpoint { (checkpoint + 1) .. max_slot } else { 0 .. max_slot } } }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), @@ -836,10 +859,10 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydro_test :: cluster :: paxos :: Ballot) > > ({ use hydro_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -859,10 +882,10 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), ), @@ -880,7 +903,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_10, @@ -893,18 +916,18 @@ expression: built.ir() ), }, Tee { - inner: , + inner: , }, ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydro_test :: cluster :: paxos :: Ballot) > ({ use hydro_std :: __staged :: request_response :: * ; | (key , _) | key }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydro_test :: cluster :: paxos :: Ballot) , ((usize , hydro_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), input: Difference( Tee { - inner: , + inner: , }, CycleSource { ident: Ident { @@ -934,91 +957,88 @@ expression: built.ir() 1, ), ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { if prev_checkpoint . map (| prev | new_checkpoint > prev) . unwrap_or (true) { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), - input: Persist( - Chain( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), - input: Delta( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { if prev_checkpoint . map (| prev | new_checkpoint > prev) . unwrap_or (true) { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), + input: Persist( + Chain( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos :: P2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , hydro_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , hydro_test :: cluster :: paxos :: CheckpointOrP2a < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), + input: Delta( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydro_lang :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: : ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), - input: Persist( - Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 1, - ), - to_key: None, - serialize_fn: Some( - | (id , data) : (hydro_lang :: ClusterId < _ > , usize) | { (id . raw_id , hydro_lang :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }, - ), - instantiate_fn: , - deserialize_fn: Some( - | res | { let (id , b) = res . unwrap () ; (hydro_lang :: ClusterId :: < hydro_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydro_lang :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }, - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydro_lang :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > > > (__hydro_lang_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: CycleSource { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 3, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydro_lang :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : ReduceKeyed { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), + input: Persist( + Network { + from_location: Cluster( + 3, + ), + from_key: None, + to_location: Cluster( + 1, + ), + to_key: None, + serialize_fn: Some( + | (id , data) : (hydro_lang :: ClusterId < _ > , usize) | { (id . raw_id , hydro_lang :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }, + ), + instantiate_fn: , + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydro_lang :: ClusterId :: < hydro_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydro_lang :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }, + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydro_lang :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > > > (__hydro_lang_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, }, + location_kind: Cluster( + 3, + ), }, }, - ), - }, + }, + ), }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydro_lang :: __staged :: stream :: * ; | _u | () }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | num_received | if num_received == f__free + 1 { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydro_lang :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydro_lang :: __staged :: stream :: * ; | _u | () }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | num_received | if num_received == f__free + 1 { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydro_lang :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , }, }, }, - ), - }, + }, + ), }, }, - ), - }, - ), + }, + ), + }, ), - }, + ), }, }, CycleSink { @@ -1033,7 +1053,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydro_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydro_test :: cluster :: paxos :: Ballot) , hydro_test :: cluster :: paxos :: Ballot) > > ({ use hydro_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1055,7 +1075,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) >) , hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), @@ -1085,10 +1105,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydro_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydro_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use hydro_std :: __staged :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1112,14 +1132,14 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | v | v }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | None }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , (hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let expected_next_slot = std :: cmp :: max (filled_slot . map (| v | v + 1) . unwrap_or (0) , highest_seq . map (| v | v + 1) . unwrap_or (0) ,) ; if sorted_payload . seq == expected_next_slot { * filled_slot = Some (sorted_payload . seq) ; } } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Chain( Map { @@ -1168,23 +1188,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == (last_seq . map (| s | s + 1) . unwrap_or (0)) , "Hole in log between seq {:?} and {}" , * last_seq , payload . seq) ; * last_seq = Some (payload . seq) ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1208,7 +1228,7 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , usize) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency__free = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if max_checkpointed_seq . map (| m | new_highest_seq - m >= checkpoint_frequency__free) . unwrap_or (true) { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Chain( @@ -1243,7 +1263,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1258,7 +1278,7 @@ expression: built.ir() 3, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1339,7 +1359,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_2, @@ -1352,7 +1372,7 @@ expression: built.ir() ), }, Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_kv :: Replica > , ((u32 , u32) , core :: result :: Result < () , () >)) , ((u32 , u32) , core :: result :: Result < () , () >) > ({ use hydro_lang :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -1375,7 +1395,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | payload | payload . kv }), input: Tee { - inner: , + inner: , }, }, }, @@ -1385,14 +1405,14 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use hydro_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydro_std :: __staged :: quorum :: * ; move | | (0 , 0) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use hydro_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1412,7 +1432,7 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let CLUSTER_SELF_ID__free = hydro_lang :: ClusterId :: < hydro_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydro_lang_cluster_self_id_2) ; let num_clients_per_node__free = 1usize ; move | _ | (0 .. num_clients_per_node__free) . map (move | i | ((CLUSTER_SELF_ID__free . raw_id * (num_clients_per_node__free as u32)) + i as u32 , 0)) }), input: Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | * curr = new }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | () }), @@ -1428,8 +1448,8 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (u32 , u32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . 0 , payload . 1 + 1) }), input: Tee { - inner: : Tee { - inner: , + inner: : Tee { + inner: , }, }, }, @@ -1451,7 +1471,7 @@ expression: built.ir() input: Chain( Chain( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, @@ -1468,16 +1488,16 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , tokio :: time :: Instant > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | Instant :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , tokio :: time :: Instant) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , Instant :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1504,10 +1524,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (tokio :: time :: Instant , tokio :: time :: Instant)) , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time)) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1515,7 +1535,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydro_lang :: __staged :: location :: * ; let interval__free = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) }, ), @@ -1545,7 +1565,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (u32 , u32) , () > ({ use hydro_lang :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1556,7 +1576,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydro_lang :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydro_lang :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1568,7 +1588,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1579,7 +1599,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydro_lang :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ),