Skip to content

Commit

Permalink
refactor(hydroflow_plus): use max and min in Paxos and make client ge…
Browse files Browse the repository at this point in the history
…neric over ballots (#1443)
  • Loading branch information
shadaj authored Sep 5, 2024
1 parent 486dfbe commit c752aff
Show file tree
Hide file tree
Showing 4 changed files with 1,812 additions and 1,498 deletions.
30 changes: 27 additions & 3 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> {
}
}

impl<'a, T, C, N: Location> From<Singleton<'a, T, Bounded, C, N>>
for Singleton<'a, T, Unbounded, C, N>
{
fn from(singleton: Singleton<'a, T, Bounded, C, N>) -> Self {
Singleton::new(
singleton.location_kind,
singleton.ir_leaves,
singleton.ir_node.into_inner(),
)
}
}

impl<'a, T, N: Location> CycleComplete<'a, Tick> for Singleton<'a, T, Bounded, Tick, N> {
fn complete(self, ident: syn::Ident) {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
Expand Down Expand Up @@ -298,8 +310,8 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
)
}

pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
pub fn latest(self) -> Singleton<'a, T, Unbounded, NoTick, N> {
Singleton::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
Expand Down Expand Up @@ -598,7 +610,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
)
}

pub fn or_else(
pub fn unwrap_or(
self,
other: Singleton<'a, T, Bounded, Tick, N>,
) -> Singleton<'a, T, Bounded, Tick, N> {
Expand Down Expand Up @@ -692,6 +704,18 @@ impl<'a, T, B, N: Location> Optional<'a, T, B, NoTick, N> {
.latest()
.tick_samples()
}

pub fn unwrap_or(
self,
other: impl Into<Singleton<'a, T, Unbounded, NoTick, N>>,
) -> Singleton<'a, T, Unbounded, NoTick, N> {
let other = other.into();
if self.location_kind != other.location_kind {
panic!("or_else must be called on streams on the same node");
}

self.latest_tick().unwrap_or(other.latest_tick()).latest()
}
}

impl<'a, T, N: Location> Optional<'a, T, Unbounded, NoTick, N> {
Expand Down
44 changes: 44 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,28 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> {
)
}

pub fn max(self) -> Optional<'a, T, Bounded, Tick, N>
where
T: Ord,
{
self.reduce(q!(|curr, new| {
if new > *curr {
*curr = new;
}
}))
}

pub fn min(self) -> Optional<'a, T, Bounded, Tick, N>
where
T: Ord,
{
self.reduce(q!(|curr, new| {
if new < *curr {
*curr = new;
}
}))
}

pub fn sort(self) -> Stream<'a, T, Bounded, Tick, N>
where
T: Ord,
Expand Down Expand Up @@ -497,6 +519,28 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> {
})),
)
}

pub fn max(self) -> Optional<'a, T, Unbounded, NoTick, N>
where
T: Ord,
{
self.reduce(q!(|curr, new| {
if new > *curr {
*curr = new;
}
}))
}

pub fn min(self) -> Optional<'a, T, Unbounded, NoTick, N>
where
T: Ord,
{
self.reduce(q!(|curr, new| {
if new < *curr {
*curr = new;
}
}))
}
}

impl<'a, T, C, N: Location> Stream<'a, T, Bounded, C, N> {
Expand Down
94 changes: 38 additions & 56 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub fn paxos(
// c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));

let p_received_max_ballot = p_max_ballot(
flow,
&proposers,
a_to_proposers_p1b.clone(),
a_to_proposers_p2b.clone(),
p_to_proposers_i_am_leader.clone(),
Expand Down Expand Up @@ -195,10 +197,12 @@ pub fn paxos(
.for_each(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)));
// p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a)));
let (a_to_proposers_p1b_new, a_to_proposers_p2b_new) = acceptor(
flow,
p_to_acceptors_p1a,
p_to_acceptors_p2a,
r_to_acceptors_checkpoint,
&proposers,
&acceptors,
f,
);
a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b_new);
Expand All @@ -216,10 +220,12 @@ pub fn paxos(

#[allow(clippy::type_complexity)]
fn acceptor<'a>(
flow: &FlowBuilder<'a>,
p_to_acceptors_p1a: Stream<'a, P1a, Unbounded, NoTick, Cluster<Acceptor>>,
p_to_acceptors_p2a: Stream<'a, P2a, Unbounded, NoTick, Cluster<Acceptor>>,
r_to_acceptors_checkpoint: Stream<'a, (u32, i32), Unbounded, NoTick, Cluster<Acceptor>>,
proposers: &Cluster<Proposer>,
acceptors: &Cluster<Acceptor>,
f: usize,
) -> (
Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster<Proposer>>,
Expand All @@ -244,14 +250,9 @@ fn acceptor<'a>(
// Find the smallest checkpoint seq that everyone agrees to, track whenever it changes
let a_new_checkpoint = a_checkpoint_largest_seqs
.continue_if(a_checkpoints_quorum_reached)
.fold(
q!(|| -1),
q!(|min_seq, (_sender, seq)| {
if *min_seq == -1 || seq < *min_seq {
*min_seq = seq;
}
}),
)
.map(q!(|(_sender, seq)| seq))
.min()
.unwrap_or(flow.singleton(acceptors, q!(-1)).latest_tick())
.delta()
.map(q!(|min_seq| (
min_seq,
Expand All @@ -267,14 +268,11 @@ fn acceptor<'a>(
)));
// .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq)));

let a_max_ballot = p_to_acceptors_p1a.clone().fold(
q!(|| Ballot { num: 0, id: 0 }),
q!(|max_ballot, p1a| {
if p1a.ballot > *max_ballot {
*max_ballot = p1a.ballot;
}
}),
);
let a_max_ballot = p_to_acceptors_p1a
.clone()
.map(q!(|p1a| p1a.ballot))
.max()
.unwrap_or(flow.singleton(acceptors, q!(Ballot { num: 0, id: 0 })));
let a_p2as_to_place_in_log = p_to_acceptors_p2a
.clone()
.tick_batch()
Expand Down Expand Up @@ -439,7 +437,7 @@ fn p_p2b<'a>(
fn p_p2a<'a>(
flow: &FlowBuilder<'a>,
proposers: &Cluster<Proposer>,
p_max_slot: Singleton<'a, i32, Bounded, Tick, Cluster<Proposer>>,
p_max_slot: Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
c_to_proposers: Stream<'a, ClientPayload, Unbounded, NoTick, Cluster<Proposer>>,
p_ballot_num: Singleton<'a, u32, Bounded, Tick, Cluster<Proposer>>,
p_log_to_try_commit: Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
Expand All @@ -454,6 +452,7 @@ fn p_p2a<'a>(
let (p_next_slot_complete_cycle, p_next_slot) =
flow.tick_cycle::<Optional<i32, _, _, _>>(proposers);
let p_next_slot_after_reconciling_p1bs = p_max_slot
.unwrap_or(flow.singleton(proposers, q!(-1)).latest_tick())
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
.continue_unless(p_next_slot.clone())
.map(q!(|max_slot| max_slot + 1));
Expand Down Expand Up @@ -518,7 +517,7 @@ fn p_p1b<'a>(
) -> (
Optional<'a, bool, Bounded, Tick, Cluster<Proposer>>,
Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
Singleton<'a, i32, Bounded, Tick, Cluster<Proposer>>,
Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
) {
let p_id = flow.cluster_self_id(proposers);
Expand Down Expand Up @@ -580,14 +579,10 @@ fn p_p1b<'a>(
None
}
));
let p_max_slot = p_p1b_highest_entries_and_count.clone().fold(
q!(|| -1),
q!(|max_slot, (slot, (_count, _entry))| {
if slot > *max_slot {
*max_slot = slot;
}
}),
);
let p_max_slot = p_p1b_highest_entries_and_count
.clone()
.map(q!(|(slot, _)| slot))
.max();
let p_proposed_slots = p_p1b_highest_entries_and_count
.clone()
.map(q!(|(slot, _)| slot));
Expand Down Expand Up @@ -687,14 +682,10 @@ fn replica<'a>(
// Send checkpoints to the acceptors when we've processed enough payloads
let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
flow.tick_cycle::<Optional<'a, i32, _, _, _>>(replicas);
let r_max_checkpointed_seq = r_checkpointed_seqs.persist().fold(
q!(|| -1),
q!(|max_seq, seq| {
if seq > *max_seq {
*max_seq = seq;
}
}),
);
let r_max_checkpointed_seq = r_checkpointed_seqs
.persist()
.max()
.unwrap_or(flow.singleton(replicas, q!(-1)).latest_tick());
let r_checkpoint_seq_new = r_max_checkpointed_seq
.cross_singleton(r_new_highest_seq)
.filter_map(q!(
Expand All @@ -717,9 +708,9 @@ fn replica<'a>(
}

// Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed.
fn client<'a>(
fn client<'a, B: Address + Ord + std::fmt::Debug + Clone>(
clients: &Cluster<Client>,
p_to_clients_leader_elected: Stream<'a, Ballot, Unbounded, NoTick, Cluster<Client>>,
p_to_clients_leader_elected: Stream<'a, B, Unbounded, NoTick, Cluster<Client>>,
r_to_clients_payload_applied: Stream<
'a,
(u32, ReplicaPayload),
Expand All @@ -741,12 +732,7 @@ fn client<'a>(
)));
// r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload)));
// Only keep the latest leader
let c_max_leader_ballot =
p_to_clients_leader_elected.reduce(q!(|curr_max_ballot, new_ballot| {
if new_ballot > *curr_max_ballot {
*curr_max_ballot = new_ballot;
}
}));
let c_max_leader_ballot = p_to_clients_leader_elected.max();
let c_new_leader_ballot = c_max_leader_ballot.clone().latest_tick().delta();
// Whenever the leader changes, make all clients send a message
let c_new_payloads_when_leader_elected =
Expand Down Expand Up @@ -901,7 +887,7 @@ fn client<'a>(
}
}),
);
#[allow(clippy::type_complexity)]

c_stats_output_timer
.cross_singleton(c_latencies)
.cross_singleton(c_throughput)
Expand All @@ -928,6 +914,8 @@ fn client<'a>(

// Proposer logic to calculate the largest ballot received so far.
fn p_max_ballot<'a>(
flow: &FlowBuilder<'a>,
proposers: &Cluster<Proposer>,
a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster<Proposer>>,
a_to_proposers_p2b: Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster<Proposer>>,
p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster<Proposer>>,
Expand All @@ -938,18 +926,11 @@ fn p_max_ballot<'a>(
let p_received_p2b_ballots = a_to_proposers_p2b
.clone()
.map(q!(|(_, p2b)| p2b.max_ballot));
let p_received_max_ballot = p_received_p1b_ballots
p_received_p1b_ballots
.union(p_received_p2b_ballots)
.union(p_to_proposers_i_am_leader)
.fold(
q!(|| Ballot { num: 0, id: 0 }),
q!(|curr_max_ballot, new_ballot| {
if new_ballot > *curr_max_ballot {
*curr_max_ballot = new_ballot;
}
}),
);
p_received_max_ballot
.max()
.unwrap_or(flow.singleton(proposers, q!(Ballot { num: 0, id: 0 })))
}

// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot.
Expand Down Expand Up @@ -1017,9 +998,10 @@ fn p_p1a<'a>(
let p_id = flow.cluster_self_id(proposers);
let p_to_proposers_i_am_leader_new = p_ballot_num
.clone()
.latest()
.sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
.tick_batch()
.continue_if(
flow.source_interval(proposers, q!(Duration::from_secs(i_am_leader_send_timeout)))
.latest_tick(),
)
.continue_if(p_is_leader.clone())
.map(q!(move |ballot_num| Ballot {
num: ballot_num,
Expand Down
Loading

0 comments on commit c752aff

Please sign in to comment.