Skip to content

Commit

Permalink
feat: add kvs_mut example, fix #785 (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Mar 5, 2024
1 parent f8311db commit fd90d41
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 37 deletions.
28 changes: 0 additions & 28 deletions hydroflow/examples/kvs/README.md

This file was deleted.

28 changes: 28 additions & 0 deletions hydroflow/examples/kvs_mut/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Simple single-node key-value store example based on a join of PUTs and GETs.

Current semantics are:
- PUTs are appended: we remember them all forever
- GETs are only remembered for the current tick, which may not be monotone depending on how they
are consumed.
- GETs for empty keys get no acknowledgement.

Clients accept commands on stdin. Command syntax is as follows:
- `PUT <key>, <value>`
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Running the example

To run the example, open 2 terminals.

In one terminal run the server like so:
```
cargo run -p hydroflow --example kvs_mut -- --role server --addr localhost:12346
```

In another terminal run a client:
```
cargo run -p hydroflow --example kvs_mut -- --role client --addr localhost:9090 --server-addr localhost:12346
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
File renamed without changes.
29 changes: 29 additions & 0 deletions hydroflow/examples/kvs_mut/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use regex::Regex;

use crate::protocol::KvsMessage;

pub fn parse_command(line: String) -> Option<KvsMessage> {
let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap();
let caps = re.captures(line.as_str())?;

let binding = caps.get(1).unwrap().as_str().to_uppercase();
let cmdstr = binding.as_str();
let args = caps.get(2).unwrap().as_str();
match cmdstr {
"PUT" => {
let kv = args.split_once(',')?;
Some(KvsMessage::Put {
key: kv.0.trim().to_string(),
value: Some(kv.1.trim().to_string()),
})
}
"DELETE" => Some(KvsMessage::Put {
key: args.trim().to_string(),
value: None,
}),
"GET" => Some(KvsMessage::Get {
key: args.trim().to_string(),
}),
_ => None,
}
}
98 changes: 98 additions & 0 deletions hydroflow/examples/kvs_mut/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::net::SocketAddr;

use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use server::run_server;

mod client;
mod helpers;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: Option<SocketAddr>,
#[clap(long, value_parser = ipv4_resolve)]
server_addr: Option<SocketAddr>,
#[clap(long)]
graph: Option<WriteGraphType>,
#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
let addr = opts.addr.unwrap();

match opts.role {
Role::Client => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Client is bound to {:?}", addr);
println!("Attempting to connect to server at {:?}", opts.server_addr);
run_client(outbound, inbound, opts.server_addr.unwrap(), opts).await;
}
Role::Server => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Listening on {:?}", opts.addr.unwrap());
run_server(outbound, inbound, opts).await;
}
}
}

#[test]
fn test() {
use std::io::Write;

use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server, _, mut server_stdout) =
run_cargo_example("kvs_mut", "--role server --addr 127.0.0.1:2061");

let (_client1, mut client1_stdin, mut client1_stdout) = run_cargo_example(
"kvs_mut",
"--role client --addr 127.0.0.1:2062 --server-addr 127.0.0.1:2061",
);

let mut server_output = String::new();
wait_for_process_output(&mut server_output, &mut server_stdout, "Server live!");

let mut client1_output = String::new();
wait_for_process_output(&mut client1_output, &mut client1_stdout, "Client live!");

client1_stdin.write_all(b"PUT a,7\n").unwrap();

let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example(
"kvs_mut",
"--role client --addr 127.0.0.1:2063 --server-addr 127.0.0.1:2061",
);

let mut client2_output = String::new();
wait_for_process_output(&mut client2_output, &mut client2_stdout, "Client live!");

client2_stdin.write_all(b"GET a\n").unwrap();
wait_for_process_output(
&mut client2_output,
&mut client2_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "7" \}"#,
);

client1_stdin.write_all(b"PUT a,8\n").unwrap();
client1_stdin.write_all(b"GET a\n").unwrap();
wait_for_process_output(
&mut client1_output,
&mut client1_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "8" \}"#,
);
}
37 changes: 37 additions & 0 deletions hydroflow/examples/kvs_mut/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::net::SocketAddr;

use hydroflow_macro::DemuxEnum;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)]
pub enum KvsMessage {
Put { key: String, value: Option<String> },
Get { key: String },
}

#[derive(Clone, Debug, DemuxEnum)]
pub enum KvsMessageWithAddr {
Put {
key: String,
value: Option<String>,
addr: SocketAddr,
},
Get {
key: String,
addr: SocketAddr,
},
}
impl KvsMessageWithAddr {
pub fn from_message(message: KvsMessage, addr: SocketAddr) -> Self {
match message {
KvsMessage::Put { key, value } => Self::Put { key, value, addr },
KvsMessage::Get { key } => Self::Get { key, addr },
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvsResponse {
pub key: String,
pub value: String,
}
58 changes: 58 additions & 0 deletions hydroflow/examples/kvs_mut/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{PersistenceKeyed, UdpSink, UdpStream};

use crate::protocol::{KvsMessageWithAddr, KvsResponse};
use crate::Opts;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Server live!");

let mut hf: Hydroflow = hydroflow_syntax! {
// Setup network channels.
network_send = dest_sink_serde(outbound);
network_recv = source_stream_serde(inbound)
-> _upcast(Some(Delta))
-> map(Result::unwrap)
-> inspect(|(msg, addr)| println!("Message received {:?} from {:?}", msg, addr))
-> map(|(msg, addr)| KvsMessageWithAddr::from_message(msg, addr))
-> demux_enum::<KvsMessageWithAddr>();
puts = network_recv[Put];
gets = network_recv[Get];

// Store puts mutably (supporting deletion)
puts
-> flat_map(|(key, value, _addr): (String, Option<String>, _)| {
match value {
Some(val) => vec![
// Clear key then put new value
PersistenceKeyed::Delete(key.clone()),
PersistenceKeyed::Persist(key, val),
],
None => vec![
PersistenceKeyed::Delete(key),
],
}
})
-> persist_mut_keyed()
-> [0]lookup;
gets -> [1]lookup;
// Join PUTs and GETs by key, persisting the PUTs.
lookup = join::<'tick, 'tick>();

// Send GET responses back to the client address.
lookup
-> inspect(|tup| println!("Found a match: {:?}", tup))
-> map(|(key, (value, client_addr))| (KvsResponse { key, value }, client_addr))
-> network_send;
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
34 changes: 34 additions & 0 deletions hydroflow/examples/kvs_pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Simple single-node key-value store example based on a join of PUTs and GETs.

Current semantics are:
- PUTs are appended: we remember them all forever
- GETs are only remembered for the current tick, which may not be monotone depending on how they
are consumed.
- GETs for empty keys get no acknowledgement.

Clients accept commands on stdin. Command syntax is as follows:
- `PUT <key>, <value>`
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Pubsub?

This KVS actually acts as a publish-subscribe service, because deleting old values, in the simplest case,
is not monotonic. So therefore a read on a particular key will receive future writes to that key.
For a more traditional, and non-monotonic KVS, see the `kvs_mut` example.

## Running the example

To run the example, open 2 terminals.

In one terminal run the server like so:
```
cargo run -p hydroflow --example kvs_pubsub -- --role server --addr localhost:12346
```

In another terminal run a client:
```
cargo run -p hydroflow --example kvs_pubsub -- --role client --addr localhost:9090 --server-addr localhost:12346
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
41 changes: 41 additions & 0 deletions hydroflow/examples/kvs_pubsub/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::net::SocketAddr;

use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};

use crate::helpers::parse_command;
use crate::protocol::KvsResponse;
use crate::Opts;

pub(crate) async fn run_client(
outbound: UdpSink,
inbound: UdpStream,
server_addr: SocketAddr,
opts: Opts,
) {
println!("Client live!");

let mut hf = hydroflow_syntax! {
// set up channels
outbound_chan = dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap);

// read in commands from stdin and forward to server
source_stdin()
-> filter_map(|line| parse_command(line.unwrap()))
-> map(|msg| { (msg, server_addr) })
-> outbound_chan;

// print inbound msgs
inbound_chan -> for_each(|(response, _addr): (KvsResponse, _)| println!("Got a Response: {:?}", response));
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
File renamed without changes.
Loading

0 comments on commit fd90d41

Please sign in to comment.