diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 159c1814ae1b..f5904a486789 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -14,13 +14,13 @@ use serde::Serialize; use stageleft::{q, IntoQuotedMut, Quoted}; use syn::parse_quote; -use crate::builder::FlowState; +use crate::builder::{self, FlowState}; use crate::cycle::{CycleCollection, CycleComplete}; use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{ CanSend, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, LocationId, }; -use crate::{Cluster, Optional, Singleton}; +use crate::{Cluster, Optional, Process, Singleton}; /// Marks the stream as being unbounded, which means that it is not /// guaranteed to be complete in finite time. @@ -690,6 +690,37 @@ pub(super) fn deserialize_bincode(tagged: bool) -> Pipeline } impl<'a, T, W, N: Location<'a>> Stream { + pub fn decouple_process( + self, + other: &Process<'a, P2>, + ) -> Stream> + where + N: CanSend<'a, Process<'a, P2>, In = T, Out = T>, + T: Clone + Serialize + DeserializeOwned, + { + self.send_bincode::, T>(other) + } + + pub fn decouple_cluster( + self, + other: &Cluster<'a, C2>, + ) -> Stream> + where + N: CanSend<'a, Cluster<'a, C2>, In = (u32, T), Out = (u32, T)>, + T: Clone + Serialize + DeserializeOwned, + { + let self_node_id = match self.location_kind { + LocationId::Cluster(cluster_id) => builder::ClusterSelfId { + id: cluster_id, + _phantom: PhantomData, + }, + _ => panic!("decouple_cluster must be called on a cluster"), + }; + + self.map(q!(move |b| (self_node_id, b.clone()))) + .send_bincode_interleaved(other) + } + pub fn send_bincode, CoreType>( self, other: &N2, diff --git a/hydroflow_plus_test/examples/two_pc.rs b/hydroflow_plus_test/examples/two_pc.rs new file mode 100644 index 000000000000..df946a70613c --- /dev/null +++ b/hydroflow_plus_test/examples/two_pc.rs @@ -0,0 +1,34 @@ +use hydro_deploy::Deployment; +use hydroflow_plus::deploy::TrybuildHost; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let _localhost = deployment.Localhost(); + + let builder = hydroflow_plus::FlowBuilder::new(); + let num_participants: u32 = 3; + + let (coordinator, participants, client) = + hydroflow_plus_test::cluster::two_pc::two_pc(&builder, num_participants); + + let _rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off"; + + let _nodes = builder + .with_default_optimize() + .with_process(&coordinator, TrybuildHost::new(deployment.Localhost())) + .with_cluster( + &participants, + (0..num_participants) + .map(|_| TrybuildHost::new(deployment.Localhost())) + .collect::>(), + ) + .with_process(&client, TrybuildHost::new(deployment.Localhost())) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap(); +} diff --git a/hydroflow_plus_test/src/cluster/mod.rs b/hydroflow_plus_test/src/cluster/mod.rs index 1e3bc5374a70..bcc449c3195d 100644 --- a/hydroflow_plus_test/src/cluster/mod.rs +++ b/hydroflow_plus_test/src/cluster/mod.rs @@ -4,3 +4,4 @@ pub mod map_reduce; pub mod paxos; pub mod paxos_bench; pub mod simple_cluster; +pub mod two_pc; diff --git a/hydroflow_plus_test/src/cluster/pbft.rs b/hydroflow_plus_test/src/cluster/pbft.rs new file mode 100644 index 000000000000..ce128330fa8b --- /dev/null +++ b/hydroflow_plus_test/src/cluster/pbft.rs @@ -0,0 +1,390 @@ +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, SystemTime}; + +use hydroflow_plus::*; +use serde::{Deserialize, Serialize}; +use stageleft::*; +use tokio::time::Instant; + +pub struct Client {} +pub struct Replica {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct Request { + pub client_id: u32, + pub operation: String, + pub timestamp: u64, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct Reply { + pub view: u32, + pub timestamp: u64, + pub client_id: u32, + pub replica_id: u32, + pub result: String, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct PrePrepare { + pub view: u32, + pub sequence_number: u64, + pub request: Request, + pub digest: String, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct Prepare { + pub view: u32, + pub sequence_number: u64, + pub digest: String, + pub replica_id: u32, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct Commit { + pub view: u32, + pub sequence_number: u64, + pub digest: String, + pub replica_id: u32, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct ViewChange { + pub view: u32, + pub replica_id: u32, + pub last_sequence_number: u64, + pub P: Vec, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct NewView { + pub view: u32, + pub V: Vec, + pub O: Vec, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] +pub struct Prepared { + pub view: u32, + pub sequence_number: u64, + pub digest: String, +} + +pub fn pbft<'a>( + flow: &FlowBuilder<'a>, + num_clients_per_node: usize, + f: usize, + throughput_window_size: usize, + view_timeout: u64, +) -> ( + Cluster, + Cluster, +) { + let clients = flow.cluster::(); + let replicas = flow.cluster::(); + + let client_id = flow.cluster_self_id(&clients); + let replica_id = flow.cluster_self_id(&replicas); + + let num_replicas = 3 * f + 1; + + // Define the current view number, starting from 0 + let (current_view_cycle, current_view_stream) = flow.tick_cycle_with_initial( + &replicas, + flow.singleton(&replicas, q!(0u32)).latest_tick(), + ); + let current_view = current_view_stream.latest(); + + // Each replica determines if it's the primary (leader) for the current view + let is_primary = current_view + .map(q!(move |view| (*view % (num_replicas as u32)) == replica_id)) + .latest_tick(); + + // Client sends requests + let client_requests = flow.source_iter(&clients, q!([ + Request { + client_id, + operation: "operation1".to_string(), + timestamp: 1, + }, + Request { + client_id, + operation: "operation2".to_string(), + timestamp: 2, + }, + Request { + client_id, + operation: "operation3".to_string(), + timestamp: 3, + }, + ])); + + // Clients broadcast requests to all replicas + let requests_to_replicas = client_requests.broadcast_bincode(&replicas); + + // Each replica filters requests intended for the current primary + let requests_to_primary = requests_to_replicas + .cross_singleton(current_view.clone()) + .filter(q!(move |(request, view)| { + (*view % (num_replicas as u32)) == replica_id + })) + .map(q!(|(request, _)| request.clone())) + .continue_if(is_primary.clone()); + + // The primary processes the requests and creates PrePrepare messages + let (sequence_number_cycle, sequence_number_stream) = flow.tick_cycle_with_initial( + &replicas, + flow.singleton(&replicas, q!(0u64)).latest_tick(), + ); + let sequence_number = sequence_number_stream.latest(); + + let pre_prepares = requests_to_primary + .enumerate() + .cross_singleton(current_view.clone()) + .cross_singleton(sequence_number.clone()) + .map(q!(move |(((index, request), view), seq_num)| PrePrepare { + view: *view, + sequence_number: *seq_num + index as u64 + 1, + request: request.clone(), + digest: format!("{:?}", request), + })) + .persist(); + + // Update the sequence number for the next batch + let next_sequence_number = sequence_number + .cross_singleton(pre_prepares.clone().count()) + .map(q!(|(seq_num, count)| seq_num + count as u64)) + .defer_tick(); + sequence_number_cycle.complete(next_sequence_number.into()); + + // The primary broadcasts PrePrepare messages to all replicas + let pre_prepares_broadcast = pre_prepares.clone().broadcast_bincode(&replicas); + + // Replicas receive PrePrepare messages + let received_pre_prepares = pre_prepares_broadcast.tick_batch(); + + // Replicas verify PrePrepare messages and create Prepare messages + let prepares = received_pre_prepares + .cross_singleton(replica_id.clone()) + .map(q!(|pre_prepare, replica_id| Prepare { + view: pre_prepare.view, + sequence_number: pre_prepare.sequence_number, + digest: pre_prepare.digest.clone(), + replica_id, + })) + .broadcast_bincode(&replicas); + + // Replicas receive Prepare messages + let received_prepares = prepares.tick_batch(); + + // Collect Prepare messages + let prepare_counts = received_prepares + .map(q!(|prepare| ((prepare.view, prepare.sequence_number, prepare.digest.clone()), prepare.replica_id))) + .fold_keyed( + q!(|| HashSet::new()), + q!(|set, replica_id| { set.insert(replica_id); }), + ); + + // Check if enough Prepare messages have been collected to move to the Commit phase + let prepared_certificates = prepare_counts + .filter(q!(move |(_, set)| set.len() >= 2 * f)) + .map(q!(|(key, _)| Prepared { + view: key.0, + sequence_number: key.1, + digest: key.2.clone(), + })) + .persist(); + + // Replicas create Commit messages and broadcast them + let commits = prepared_certificates + .cross_singleton(replica_id.clone()) + .map(q!(|cert, replica_id| Commit { + view: cert.view, + sequence_number: cert.sequence_number, + digest: cert.digest.clone(), + replica_id, + })) + .broadcast_bincode(&replicas); + + // Replicas receive Commit messages + let received_commits = commits.tick_batch(); + + // Collect Commit messages + let commit_counts = received_commits + .map(q!(|commit| ((commit.view, commit.sequence_number, commit.digest.clone()), commit.replica_id))) + .fold_keyed( + q!(|| HashSet::new()), + q!(|set, replica_id| { set.insert(replica_id); }), + ); + + // Replicas execute requests after receiving enough Commit messages + let executed_requests = commit_counts + .filter(q!(move |(_, set)| set.len() >= 2 * f + 1)) + .map(q!(|(key, _)| { + println!("Replica {} executed request at view {}, seq {}", replica_id, key.0, key.1); + (key.1, key.2.clone()) + })) + .persist(); + + // Maintain the highest sequence number executed + let highest_sequence_number_stream = executed_requests + .map(q!(|(seq_num, _)| seq_num)) + .max() + .unwrap_or(flow.singleton(&replicas, q!(0u64)).latest_tick()); + + let highest_sequence_number = highest_sequence_number_stream.latest_tick(); + + // View change mechanism + + // Define a timeout to detect primary failure + let timeout_interval = flow + .source_interval(&replicas, q!(Duration::from_secs(view_timeout))) + .latest_tick(); + + // Replicas send ViewChange messages upon timeout + let view_changes = timeout_interval + .cross_singleton(current_view.clone()) + .cross_singleton(replica_id.clone()) + .cross_singleton(highest_sequence_number.clone()) + .cross_singleton(prepared_certificates.clone().collect()) + .map(q!(move |(((((), view), replica_id), last_seq_num), prepared)| ViewChange { + view: *view + 1, + replica_id, + last_sequence_number: *last_seq_num, + P: prepared.clone(), + })) + .broadcast_bincode(&replicas); + + // Replicas receive ViewChange messages + let received_view_changes = view_changes.tick_batch(); + + let view_change_counts = received_view_changes + .map(q!(|vc| (vc.view, vc))) + .fold_keyed( + q!(|| Vec::new()), + q!(|vec, vc| { vec.push(vc); }), + ); + + // Check if enough ViewChange messages have been collected to form a NewView + let new_views = view_change_counts + .filter(q!(move |(_, vec)| vec.len() >= 2 * f + 1)) + .map(q!(|(view, vcs)| NewView { + view, + V: vcs.clone(), + O: vec![], // This should be constructed based on V and P + })) + .broadcast_bincode(&replicas); + + // Replicas receive NewView messages and update the current view + let received_new_views = new_views.tick_batch(); + + let updated_view = received_new_views + .map(q!(|nv| nv.view)) + .latest_tick(); + + // Update the current view + current_view_cycle.complete(updated_view.into()); + + // Each replica determines if it is the new primary after view change + let new_is_primary = updated_view + .map(q!(move |view| (*view % (num_replicas as u32)) == replica_id)) + .latest_tick(); + + // The new primary processes any pending requests from ViewChange messages and generates new PrePrepare messages + // For simplicity, we'll assume there are no pending requests in this example + // In practice, you would reconstruct O based on the V and P collections in the NewView message + + // Benchmark code similar to Paxos implementation + + // Track throughput + + // Define a timer for statistics output + let stats_interval = flow.source_interval(&clients, q!(Duration::from_secs(1))); + + // Collect the number of executed requests + let executed_requests_count = executed_requests + .count() + .continue_unless(stats_interval.clone().latest_tick()) + .map(q!(|count| (count, false))); + + let reset_throughput = stats_interval + .clone() + .latest_tick() + .map(q!(|_| (0usize, true))) + .defer_tick(); + + let throughput = executed_requests_count + .union(reset_throughput) + .all_ticks() + .fold( + q!(|| 0usize), + q!(|(total, (count, reset))| { + if reset { + *total = 0; + } else { + *total += count; + } + }), + ); + + // Output throughput + stats_interval + .cross_singleton(throughput.clone()) + .tick_samples() + .for_each(q!(move |(_, total)| { + println!("Throughput: {} requests/s", total); + })); + + // Latency tracking + + // Create cycles to track request send and execution times + let (request_times_cycle, request_times_stream) = flow.cycle(&clients); + let request_times = request_times_stream.latest_tick(); + + // When clients send requests, record the timestamp + let client_request_times = client_requests + .map(q!(move |_| (client_id, SystemTime::now()))) + .persist(); + + request_times_cycle.complete(client_request_times.into()); + + // When replicas execute requests, record the timestamp + let execution_times = executed_requests + .map(q!(move |(seq_num, digest)| (seq_num, digest, SystemTime::now()))) + .broadcast_bincode(&clients); + + // Clients receive execution times and calculate latency + let latencies = execution_times + .cross_singleton(request_times.clone()) + .map(q!(|((seq_num, digest, exec_time), (client_id, send_time))| { + let latency = exec_time.duration_since(send_time).unwrap_or(Duration::ZERO); + latency.as_millis() + })) + .collect() + .latest_tick(); + + // Calculate average latency over the window + let average_latency = latencies + .map(q!(move |latencies_vec| { + if latencies_vec.is_empty() { + 0 + } else { + let sum: u128 = latencies_vec.iter().sum(); + (sum / latencies_vec.len() as u128) as usize + } + })); + + // Output latency + stats_interval + .cross_singleton(average_latency) + .tick_samples() + .for_each(q!(move |(_, avg_latency)| { + println!("Average Latency: {} ms", avg_latency); + })); + + ( + clients, + replicas, + ) +} diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 428c020f49b1..68a2c63cd8cb 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -1,6 +1,33 @@ use hydroflow_plus::*; use stageleft::*; +pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) { + let cluster1 = flow.cluster(); + let cluster2 = flow.cluster(); + let cluster_self_id = cluster2.self_id(); + let cluster1_self_id = cluster1.self_id(); + cluster1 + .source_iter(q!(vec!(cluster1_self_id))) + // .for_each(q!(|message| println!("hey, {}", message))) + .inspect(q!(|message| println!("Cluster1 node sending message: {}", message))) + .decouple_cluster(&cluster2) + .for_each(q!(move |message| println!( + "My self id is {}, my message is {}", + cluster_self_id, message + ))); + (cluster1, cluster2) +} + +pub fn decouple_process<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) { + let process1 = flow.process(); + let process2 = flow.process(); + process1 + .source_iter(q!(0..3)) + .decouple_process(&process2) + .for_each(q!(|message| println!("I received message is {}", message))); + (process1, process2) +} + pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) { let process = flow.process(); let cluster = flow.cluster(); @@ -86,4 +113,70 @@ mod tests { ); } } + + #[tokio::test] + async fn decouple_process() { + let mut deployment = Deployment::new(); + + let builder = hydroflow_plus::FlowBuilder::new(); + let (process1, process2) = super::decouple_process(&builder); + let built = builder.with_default_optimize(); + + let nodes = built + .with_process(&process1, TrybuildHost::new(deployment.Localhost())) + .with_process(&process2, TrybuildHost::new(deployment.Localhost())) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + let mut process2_stdout = nodes.get_process(&process2).stdout().await; + deployment.start().await.unwrap(); + for i in 0..3 { + let expected_message = format!("I received message is {}", i); + assert_eq!(process2_stdout.recv().await.unwrap(), expected_message); + } + } + + #[tokio::test] + async fn decouple_cluster() { + let mut deployment = Deployment::new(); + + let builder = hydroflow_plus::FlowBuilder::new(); + let (cluster1, cluster2) = super::decouple_cluster(&builder); + let built = builder.with_default_optimize(); + + let nodes = built + .with_cluster( + &cluster1, + (0..3) + .map(|_| TrybuildHost::new(deployment.Localhost())) + .collect::>(), + ) + .with_cluster( + &cluster2, + (0..3) + .map(|_| TrybuildHost::new(deployment.Localhost())) + .collect::>(), + ) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let cluster2_stdouts = futures::future::join_all( + nodes + .get_cluster(&cluster2) + .members() + .iter() + .map(|node| node.stdout()), + ) + .await; + + deployment.start().await.unwrap(); + + for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() { + for _j in 0..1 { + let expected_message = format!("My self id is {}, my message is {}", i, i); + assert_eq!(stdout.recv().await.unwrap(), expected_message); + } + } + } } diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs new file mode 100644 index 000000000000..f93516970cf9 --- /dev/null +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -0,0 +1,93 @@ +use hydroflow_plus::*; +use stageleft::*; + +// if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. +// + +pub struct Participants {} + +pub struct Coordinator {} + +pub struct Client {} + +pub fn two_pc<'a>( + flow: &FlowBuilder<'a>, + num_participants: u32, +) -> ( + Process<'a, Coordinator>, + Cluster<'a, Participants>, + Process<'a, Client>, +) { + // Assume single client. + let client = flow.process::(); + + // Assume single coordinator. + let coordinator = flow.process::(); + + // Assume 3 participants. + let participants = flow.cluster::(); + + // assume 3 transactions are generated from 0 to 3 + let client_transaction = client.source_iter(q!(0..3)); + + let c_receive_client_transactions = client_transaction.send_bincode(&coordinator); + c_receive_client_transactions + .clone() + .inspect(q!(|t| println!( + "receive transaction {}, ready to broadcast", + t + ))); + + // broadcast prepare message to participants. + let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants); + + // assume all participants reply commit + let p_ready_to_commit = p_receive_prepare.map(q!(|t| (t, String::from("commit")))); + let c_received_reply = p_ready_to_commit.send_bincode(&coordinator); + // c_received_reply.clone().inspect(q!(|(id, (t, reply))| println!("participant {id} said {reply} for transaction {t}"))); + + // collect votes from participant. + // aborted transactions. + let c_participant_voted_abort = c_received_reply + .clone() + .filter(q!(|(_id, (_t, reply))| reply == "abort")) + .map(q!(|(id, (t, _reply))| (t, id))); + let p_receive_abort = c_participant_voted_abort.broadcast_bincode(&participants); + p_receive_abort.clone().inspect(q!(|(t, id)| println!( + "{} vote abort for transaction {}", + id, t + ))); + let c_receive_ack = p_receive_abort.send_bincode(&coordinator); + c_receive_ack.for_each(q!(|(id, (t, _))| println!( + "Coordinator receive participant {} abort for transaction {}", + id, t + ))); + + // committed transactions + let c_participant_voted_commit = c_received_reply + .filter(q!(|(_id, (_t, reply))| reply == "commit")) + .map(q!(|(id, (t, _reply))| (t, id))) + // fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2). + // The output will have one tuple for each distinct K, with an accumulated value of type V2. + .tick_batch().fold_keyed(q!(|| 0), q!(|old: &mut u32, _: u32| *old += 1)).filter_map(q!(move |(t, count)| { + // here I set the participant to 3. If want more or less participant, fix line 26 of examples/broadcast.rs + if count == num_participants { + Some(t) + } else { + None + } + })); + + // broadcast commit transactions to participants. + let p_receive_commit = c_participant_voted_commit + .all_ticks() + .broadcast_bincode(&participants); + // p_receive_commit.clone().for_each(q!(|t| println!("commit for transaction {}", t))); + + let c_receive_ack = p_receive_commit.send_bincode(&coordinator); + c_receive_ack.for_each(q!(|(id, t)| println!( + "receive participant {} commit for transaction {}", + id, t + ))); + (coordinator, participants, client) +}