Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
“Dee committed Oct 3, 2024
1 parent 61f9530 commit 78d5816
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 46 deletions.
17 changes: 7 additions & 10 deletions hydroflow_plus_test/examples/two_pc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,20 @@ async fn main() {
let num_participants: u32 = 3;

let (coordinator, participants, client) =
hydroflow_plus_test::cluster::two_pc::two_pc(
&builder,
num_participants,
);
hydroflow_plus_test::cluster::two_pc::two_pc(&builder, num_participants);

let rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off";

let _nodes = builder
.with_default_optimize()
.with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
.with_cluster(
&participants,
(0..num_participants)
.map(|_| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>(),
)
.with_process(&client, TrybuildHost::new(deployment.Localhost()))
&participants,
(0..num_participants)
.map(|_| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>(),
)
.with_process(&client, TrybuildHost::new(deployment.Localhost()))
.deploy(&mut deployment);

deployment.deploy().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub mod map_reduce;
pub mod paxos;
pub mod paxos_bench;
pub mod simple_cluster;
pub mod two_pc;
pub mod two_pc;
76 changes: 41 additions & 35 deletions hydroflow_plus_test/src/cluster/two_pc.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use hydroflow_plus::*;
use stageleft::*;

/*
if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side.
*/

// if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side.
//

pub struct Participants {}

Expand All @@ -17,47 +13,54 @@ pub struct Client {}
pub fn two_pc(
flow: &FlowBuilder,
num_participants: u32,
) -> (
Process<Coordinator>,
Cluster<Participants>,
Process<Client>) {
) -> (Process<Coordinator>, Cluster<Participants>, Process<Client>) {
// Assume single client.
let client = flow.process::<Client>();

// Assume single coordinator.
let coordinator = flow.process::<Coordinator>();
// Assume 3 participants.

// Assume 3 participants.
let participants = flow.cluster::<Participants>();

// assume 3 transactions are generated from 0 to 3
let client_transaction = flow.source_iter(&client, q!(0..3));

let c_receive_client_transactions = client_transaction.send_bincode(&coordinator);
c_receive_client_transactions.clone().inspect(q!(|t| println!("receive transaction {}, ready to broadcast", t)));

/* broadcast prepare message to participants. */
c_receive_client_transactions
.clone()
.inspect(q!(|t| println!(
"receive transaction {}, ready to broadcast",
t
)));

// broadcast prepare message to participants.
let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants);

// assume all participants reply commit
let p_ready_to_commit = p_receive_prepare.map(q!(|t| (t, String::from("commit"))));
let c_received_reply = p_ready_to_commit.send_bincode(&coordinator);
// c_received_reply.clone().inspect(q!(|(id, (t, reply))| println!("participant {id} said {reply} for transaction {t}")));

/* collect votes from participant. */
// collect votes from participant.
// aborted transactions.
let c_participant_voted_abort =
c_received_reply.clone()
.filter(q!(|(_id, (_t, reply))| reply == "abort"))
.map(q!(|(id, (t, _reply))| (t, id)));
let c_participant_voted_abort = c_received_reply
.clone()
.filter(q!(|(_id, (_t, reply))| reply == "abort"))
.map(q!(|(id, (t, _reply))| (t, id)));
let p_receive_abort = c_participant_voted_abort.broadcast_bincode(&participants);
p_receive_abort.clone().inspect(q!(|(t, id)| println!("{} vote abort for transaction {}", id, t)));
p_receive_abort.clone().inspect(q!(|(t, id)| println!(
"{} vote abort for transaction {}",
id, t
)));
let c_receive_ack = p_receive_abort.send_bincode(&coordinator);
c_receive_ack.for_each(q!(|(id, (t, _))| println!("Coordinator receive participant {} abort for transaction {}", id, t)));
c_receive_ack.for_each(q!(|(id, (t, _))| println!(
"Coordinator receive participant {} abort for transaction {}",
id, t
)));

// committed transactions
let c_participant_voted_commit =
c_received_reply
let c_participant_voted_commit = c_received_reply
.filter(q!(|(_id, (_t, reply))| reply == "commit"))
.map(q!(|(id, (t, _reply))| (t, id)))
// fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2).
Expand All @@ -69,19 +72,22 @@ pub fn two_pc(
} else {
None
}
}))
;
}));

// broadcast commit transactions to participants.
let p_receive_commit = c_participant_voted_commit.all_ticks().broadcast_bincode(&participants);
let p_receive_commit = c_participant_voted_commit
.all_ticks()
.broadcast_bincode(&participants);
// p_receive_commit.clone().for_each(q!(|t| println!("commit for transaction {}", t)));

let c_receive_ack = p_receive_commit.send_bincode(&coordinator);
c_receive_ack.for_each(q!(|(id, t)| println!("receive participant {} commit for transaction {}", id, t)));
c_receive_ack.for_each(q!(|(id, t)| println!(
"receive participant {} commit for transaction {}",
id, t
)));
(coordinator, participants, client)
}


#[cfg(test)]
mod tests {
use hydro_deploy::Deployment;
Expand All @@ -92,19 +98,19 @@ mod tests {
let mut deployment = Deployment::new();

let builder: hydroflow_plus::FlowBuilder<'_> = hydroflow_plus::FlowBuilder::new();
let num_participants : u32 = 3;
let (coordinator, participants, client) = super::two_pc(&builder, num_participants);
let num_participants: u32 = 3;
let (coordinator, participants, client) = super::two_pc(&builder, num_participants);
let built = builder.with_default_optimize();
let nodes = built
.with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
.with_cluster(
.with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
.with_cluster(
&participants,
(0..num_participants)
.map(|_| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>(),
)
.with_process(&client, TrybuildHost::new(deployment.Localhost()))
.deploy(&mut deployment);
.deploy(&mut deployment);
// println!("{:?}", built.ir());
deployment.deploy().await.unwrap();

Expand Down

0 comments on commit 78d5816

Please sign in to comment.