diff --git a/hydroflow/examples/kvs_pubsub/README.md b/hydroflow/examples/kvs/README.md similarity index 59% rename from hydroflow/examples/kvs_pubsub/README.md rename to hydroflow/examples/kvs/README.md index e52bffb9c385..4f1ebc6d43be 100644 --- a/hydroflow/examples/kvs_pubsub/README.md +++ b/hydroflow/examples/kvs/README.md @@ -11,24 +11,33 @@ Clients accept commands on stdin. Command syntax is as follows: - `GET ` Commands are case-insensitive. All keys and values are treated as `String`s. -## Pubsub? +## Overwriting values? -This KVS actually acts as a publish-subscribe service, because deleting old values, in the simplest case, -is not monotonic. So therefore a read on a particular key will receive future writes to that key. +This KVS actually stores all values written to a key because deleting old values, in the general case, +is not monotonic. So therefore a read on a particular key will receive all previous writes to that key. For a more traditional, and non-monotonic KVS, see the `kvs_mut` example. +The implementation difference can be found in `server.rs`. This implementation uses a `join()` +with `'static` persistence on the write side. Every written value is persisted. +```rust +// Join PUTs and GETs by key, persisting the PUTs. +puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup; +gets -> [1]lookup; +lookup = join::<'static, 'tick>(); +``` + ## Running the example To run the example, open 2 terminals. In one terminal run the server like so: ``` -cargo run -p hydroflow --example kvs_pubsub -- --role server --addr localhost:12346 +cargo run -p hydroflow --example kvs -- --role server --addr localhost:12346 ``` In another terminal run a client: ``` -cargo run -p hydroflow --example kvs_pubsub -- --role client --addr localhost:9090 --server-addr localhost:12346 +cargo run -p hydroflow --example kvs -- --role client --addr localhost:9090 --server-addr localhost:12346 ``` Adding the `--graph ` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html). diff --git a/hydroflow/examples/kvs_pubsub/client.rs b/hydroflow/examples/kvs/client.rs similarity index 100% rename from hydroflow/examples/kvs_pubsub/client.rs rename to hydroflow/examples/kvs/client.rs diff --git a/hydroflow/examples/kvs_pubsub/helpers.rs b/hydroflow/examples/kvs/helpers.rs similarity index 100% rename from hydroflow/examples/kvs_pubsub/helpers.rs rename to hydroflow/examples/kvs/helpers.rs diff --git a/hydroflow/examples/kvs_pubsub/main.rs b/hydroflow/examples/kvs/main.rs similarity index 95% rename from hydroflow/examples/kvs_pubsub/main.rs rename to hydroflow/examples/kvs/main.rs index 02210d76ac95..a4b38190f70f 100644 --- a/hydroflow/examples/kvs_pubsub/main.rs +++ b/hydroflow/examples/kvs/main.rs @@ -58,10 +58,10 @@ fn test() { use hydroflow::util::{run_cargo_example, wait_for_process_output}; let (_server, _, mut server_stdout) = - run_cargo_example("kvs_pubsub", "--role server --addr 127.0.0.1:2051"); + run_cargo_example("kvs", "--role server --addr 127.0.0.1:2051"); let (_client1, mut client1_stdin, mut client1_stdout) = run_cargo_example( - "kvs_pubsub", + "kvs", "--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051", ); @@ -74,7 +74,7 @@ fn test() { client1_stdin.write_all(b"PUT a,7\n").unwrap(); let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example( - "kvs_pubsub", + "kvs", "--role client --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051", ); diff --git a/hydroflow/examples/kvs_pubsub/protocol.rs b/hydroflow/examples/kvs/protocol.rs similarity index 100% rename from hydroflow/examples/kvs_pubsub/protocol.rs rename to hydroflow/examples/kvs/protocol.rs diff --git a/hydroflow/examples/kvs_pubsub/server.rs b/hydroflow/examples/kvs/server.rs similarity index 97% rename from hydroflow/examples/kvs_pubsub/server.rs rename to hydroflow/examples/kvs/server.rs index 812d4347cc58..6121aa138b39 100644 --- a/hydroflow/examples/kvs_pubsub/server.rs +++ b/hydroflow/examples/kvs/server.rs @@ -20,6 +20,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts puts = network_recv[Put]; gets = network_recv[Get]; + /* DIFFERENCE HERE: SEE README.md */ // Join PUTs and GETs by key, persisting the PUTs. puts -> map(|(key, value, _addr)| (key, value)) -> [0]lookup; gets -> [1]lookup; diff --git a/hydroflow/examples/kvs_mut/README.md b/hydroflow/examples/kvs_mut/README.md index 521cc3ee4674..ed699970a449 100644 --- a/hydroflow/examples/kvs_mut/README.md +++ b/hydroflow/examples/kvs_mut/README.md @@ -11,6 +11,35 @@ Clients accept commands on stdin. Command syntax is as follows: - `GET ` Commands are case-insensitive. All keys and values are treated as `String`s. +## Overwriting values? + +This KVS overwrites the old value when a new value is written to a key. In the general case this is not monotonic +because we are deleting old information. For a more monotonic KVS, see the `kvs` example. + +The implementation difference can be found in `server.rs`. This implementation uses a `persist_mut_keyed()` +to enable deletion on the `PUT` side of the `join()`. +```rust +// Store puts mutably (supporting deletion) +puts + -> flat_map(|(key, value, _addr): (String, Option, _)| { + match value { + Some(val) => vec![ + // Clear key then put new value + PersistenceKeyed::Delete(key.clone()), + PersistenceKeyed::Persist(key, val), + ], + None => vec![ + PersistenceKeyed::Delete(key), + ], + } + }) + -> persist_mut_keyed() + -> [0]lookup; +gets -> [1]lookup; +// Join PUTs and GETs by key, persisting the PUTs. +lookup = join::<'tick, 'tick>(); +``` + ## Running the example To run the example, open 2 terminals. diff --git a/hydroflow/examples/kvs_mut/server.rs b/hydroflow/examples/kvs_mut/server.rs index 594a31a26b0c..254315ab2e4a 100644 --- a/hydroflow/examples/kvs_mut/server.rs +++ b/hydroflow/examples/kvs_mut/server.rs @@ -20,6 +20,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts puts = network_recv[Put]; gets = network_recv[Get]; + /* DIFFERENCE HERE: SEE README.md */ // Store puts mutably (supporting deletion) puts -> flat_map(|(key, value, _addr): (String, Option, _)| { diff --git a/hydroflow_lang/src/graph/ops/persist_mut.rs b/hydroflow_lang/src/graph/ops/persist_mut.rs index 8eb99dfffc4a..42d90e72058b 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut.rs @@ -8,11 +8,19 @@ use super::{ }; /// `persist_mut()` is similar to `persist()` except that it also enables deletions. -/// `persist_mut()` expects an input of type `Persistence`, and it is this enumeration that enables the user to communicate deletion. -/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored. +/// `persist_mut()` expects an input of type [`Persistence`](https://docs.rs/hydroflow/latest/hydroflow/util/enum.Persistence.html), +/// and it is this enumeration that enables the user to communicate deletion. +/// Deletions/persists happen in the order they are received in the stream. +/// For example, `[Persist(1), Delete(1), Persist(1)]` will result in a a single `1` value being stored. /// /// ```hydroflow -/// source_iter([hydroflow::util::Persistence::Persist(1), hydroflow::util::Persistence::Persist(2), hydroflow::util::Persistence::Delete(1)]) +/// use hydroflow::util::Persistence; +/// +/// source_iter([ +/// Persistence::Persist(1), +/// Persistence::Persist(2), +/// Persistence::Delete(1), +/// ]) /// -> persist_mut() /// -> assert_eq([2]); /// ``` diff --git a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs index ebe6a9dac419..218ea9b42f48 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs @@ -8,11 +8,19 @@ use super::{ }; /// `persist_mut_keyed()` is similar to `persist_mut()` except that it also enables key-based deletions -/// `persist_mut()` expects an input of type `PersistenceKeyed`, and it is this enumeration that enables the user to communicate deletion. -/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored. +/// `persist_mut()` expects an input of type [`PersistenceKeyed`](https://docs.rs/hydroflow/latest/hydroflow/util/enum.PersistenceKeyed.html), +/// and it is this enumeration that enables the user to communicate deletion. +/// Deletions/persists happen in the order they are received in the stream. +/// For example, `[Persist(1), Delete(1), Persist(1)]` will result in a a single `1` value being stored. /// /// ```hydroflow -/// source_iter([hydroflow::util::PersistenceKeyed::Persist(0, 1), hydroflow::util::PersistenceKeyed::Persist(1, 1), hydroflow::util::PersistenceKeyed::Delete(1)]) +/// use hydroflow::util::PersistenceKeyed; +/// +/// source_iter([ +/// PersistenceKeyed::Persist(0, 1), +/// PersistenceKeyed::Persist(1, 1), +/// PersistenceKeyed::Delete(1), +/// ]) /// -> persist_mut_keyed() /// -> assert_eq([(0, 1)]); /// ```