Skip to content

Commit

Permalink
docs: more improvements to persist_mut[_keyed] docs (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel authored Mar 5, 2024
1 parent fd90d41 commit 550b17c
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,33 @@ Clients accept commands on stdin. Command syntax is as follows:
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Pubsub?
## Overwriting values?

This KVS actually acts as a publish-subscribe service, because deleting old values, in the simplest case,
is not monotonic. So therefore a read on a particular key will receive future writes to that key.
This KVS actually stores all values written to a key because deleting old values, in the general case,
is not monotonic. So therefore a read on a particular key will receive all previous writes to that key.
For a more traditional, and non-monotonic KVS, see the `kvs_mut` example.

The implementation difference can be found in `server.rs`. This implementation uses a `join()`
with `'static` persistence on the write side. Every written value is persisted.
```rust
// Join PUTs and GETs by key, persisting the PUTs.
puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup;
gets -> [1]lookup;
lookup = join::<'static, 'tick>();
```

## Running the example

To run the example, open 2 terminals.

In one terminal run the server like so:
```
cargo run -p hydroflow --example kvs_pubsub -- --role server --addr localhost:12346
cargo run -p hydroflow --example kvs -- --role server --addr localhost:12346
```

In another terminal run a client:
```
cargo run -p hydroflow --example kvs_pubsub -- --role client --addr localhost:9090 --server-addr localhost:12346
cargo run -p hydroflow --example kvs -- --role client --addr localhost:9090 --server-addr localhost:12346
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ fn test() {
use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server, _, mut server_stdout) =
run_cargo_example("kvs_pubsub", "--role server --addr 127.0.0.1:2051");
run_cargo_example("kvs", "--role server --addr 127.0.0.1:2051");

let (_client1, mut client1_stdin, mut client1_stdout) = run_cargo_example(
"kvs_pubsub",
"kvs",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

Expand All @@ -74,7 +74,7 @@ fn test() {
client1_stdin.write_all(b"PUT a,7\n").unwrap();

let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example(
"kvs_pubsub",
"kvs",
"--role client --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
);

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
puts = network_recv[Put];
gets = network_recv[Get];

/* DIFFERENCE HERE: SEE README.md */
// Join PUTs and GETs by key, persisting the PUTs.
puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup;
gets -> [1]lookup;
Expand Down
29 changes: 29 additions & 0 deletions hydroflow/examples/kvs_mut/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,35 @@ Clients accept commands on stdin. Command syntax is as follows:
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Overwriting values?

This KVS overwrites the old value when a new value is written to a key. In the general case this is not monotonic
because we are deleting old information. For a more monotonic KVS, see the `kvs` example.

The implementation difference can be found in `server.rs`. This implementation uses a `persist_mut_keyed()`
to enable deletion on the `PUT` side of the `join()`.
```rust
// Store puts mutably (supporting deletion)
puts
-> flat_map(|(key, value, _addr): (String, Option<String>, _)| {
match value {
Some(val) => vec![
// Clear key then put new value
PersistenceKeyed::Delete(key.clone()),
PersistenceKeyed::Persist(key, val),
],
None => vec![
PersistenceKeyed::Delete(key),
],
}
})
-> persist_mut_keyed()
-> [0]lookup;
gets -> [1]lookup;
// Join PUTs and GETs by key, persisting the PUTs.
lookup = join::<'tick, 'tick>();
```

## Running the example

To run the example, open 2 terminals.
Expand Down
1 change: 1 addition & 0 deletions hydroflow/examples/kvs_mut/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
puts = network_recv[Put];
gets = network_recv[Get];

/* DIFFERENCE HERE: SEE README.md */
// Store puts mutably (supporting deletion)
puts
-> flat_map(|(key, value, _addr): (String, Option<String>, _)| {
Expand Down
14 changes: 11 additions & 3 deletions hydroflow_lang/src/graph/ops/persist_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ use super::{
};

/// `persist_mut()` is similar to `persist()` except that it also enables deletions.
/// `persist_mut()` expects an input of type `Persistence<T>`, and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
/// `persist_mut()` expects an input of type [`Persistence<T>`](https://docs.rs/hydroflow/latest/hydroflow/util/enum.Persistence.html),
/// and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists happen in the order they are received in the stream.
/// For example, `[Persist(1), Delete(1), Persist(1)]` will result in a a single `1` value being stored.
///
/// ```hydroflow
/// source_iter([hydroflow::util::Persistence::Persist(1), hydroflow::util::Persistence::Persist(2), hydroflow::util::Persistence::Delete(1)])
/// use hydroflow::util::Persistence;
///
/// source_iter([
/// Persistence::Persist(1),
/// Persistence::Persist(2),
/// Persistence::Delete(1),
/// ])
/// -> persist_mut()
/// -> assert_eq([2]);
/// ```
Expand Down
14 changes: 11 additions & 3 deletions hydroflow_lang/src/graph/ops/persist_mut_keyed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ use super::{
};

/// `persist_mut_keyed()` is similar to `persist_mut()` except that it also enables key-based deletions
/// `persist_mut()` expects an input of type `PersistenceKeyed<T>`, and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
/// `persist_mut()` expects an input of type [`PersistenceKeyed<T>`](https://docs.rs/hydroflow/latest/hydroflow/util/enum.PersistenceKeyed.html),
/// and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists happen in the order they are received in the stream.
/// For example, `[Persist(1), Delete(1), Persist(1)]` will result in a a single `1` value being stored.
///
/// ```hydroflow
/// source_iter([hydroflow::util::PersistenceKeyed::Persist(0, 1), hydroflow::util::PersistenceKeyed::Persist(1, 1), hydroflow::util::PersistenceKeyed::Delete(1)])
/// use hydroflow::util::PersistenceKeyed;
///
/// source_iter([
/// PersistenceKeyed::Persist(0, 1),
/// PersistenceKeyed::Persist(1, 1),
/// PersistenceKeyed::Delete(1),
/// ])
/// -> persist_mut_keyed()
/// -> assert_eq([(0, 1)]);
/// ```
Expand Down

0 comments on commit 550b17c

Please sign in to comment.