From 8e1aed9090047f12f515ef5a9b164603bc962aee Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Wed, 20 Nov 2024 21:29:07 -0800 Subject: [PATCH] docs(hydroflow_plus): polish quickstart and drop stale pages (#1576) I've decided to leave some more challenging TODOs as comments for now, just so we can get the live site back into decent shape. Also changes the "Get Started" button on the landing to point to the HF+ quickstart. --- docs/docs/hydroflow_plus/aggregations.mdx | 40 --------- docs/docs/hydroflow_plus/clusters.mdx | 89 ------------------- docs/docs/hydroflow_plus/cycles.mdx | 32 ------- .../hydroflow_plus/dataflow-programming.mdx | 13 +++ docs/docs/hydroflow_plus/index.mdx | 4 +- docs/docs/hydroflow_plus/process_streams.mdx | 78 ---------------- .../hydroflow_plus/quickstart/_category_.json | 2 +- .../hydroflow_plus/quickstart/clusters.mdx | 27 +++--- .../hydroflow_plus/quickstart/distributed.mdx | 19 +++- .../quickstart/first-dataflow.mdx | 26 ++++-- docs/docs/hydroflow_plus/quickstart/index.mdx | 23 ++--- docs/docs/hydroflow_plus/stageleft.mdx | 4 +- docs/src/pages/index.js | 2 +- docs/src/util.ts | 10 +++ 14 files changed, 86 insertions(+), 283 deletions(-) delete mode 100644 docs/docs/hydroflow_plus/aggregations.mdx delete mode 100644 docs/docs/hydroflow_plus/clusters.mdx delete mode 100644 docs/docs/hydroflow_plus/cycles.mdx create mode 100644 docs/docs/hydroflow_plus/dataflow-programming.mdx delete mode 100644 docs/docs/hydroflow_plus/process_streams.mdx diff --git a/docs/docs/hydroflow_plus/aggregations.mdx b/docs/docs/hydroflow_plus/aggregations.mdx deleted file mode 100644 index 2951c1d097f5..000000000000 --- a/docs/docs/hydroflow_plus/aggregations.mdx +++ /dev/null @@ -1,40 +0,0 @@ ---- -sidebar_position: 4 ---- - -# Aggregations and Ticks -Hydroflow+ streams support aggregation operators such as `fold()`, which can be used to compute results that combine information from multiple elements along a stream. However, because Hydroflow+ streams are infinite, these operators have slightly different semantics than typical implementations. - -In particular, Hydroflow (and Hydroflow+) adhere to a _tick_ model of computation, where the stream is chunked into finite _ticks_ of data. At the beginning of each tick on each process (ticks are **not** synchronized), a batch of inputs are collected, the local dataflow graph is executed, and a batch of outputs are produced. This enables Hydroflow to support infinite streams while still being able to apply optimizations such as vectorization. - -## Specifying Windows -For most operators, such as `map` and `filter`, which operate on each element independently, the tick model is transparent. However, for operators that combine multiple elements, such as `fold`, the tick model is more visible. In particular, developers must explicitly specify whether they want aggregations such as `fold` or multi-stream operators such as `join` to operate over the latest batch of data or all data since the beginning of the stream. - -:::note - -Hydroflow+ tracks windows using the `W` type parameter on `Stream`. For streams with an unspecified window, this type will be `Async`, but with a window it will be `Windowed`. This guards against accidental aggregations since the type system will prevent you from aggregating over a stream with an `Async` window. - -::: - -To specify this **window**, Hydroflow+ offers two operators, `tick_batch()` and `all_ticks()`. The former specifies that the operator should operate over the latest batch of data, while the latter specifies that the operator should operate over all data since the beginning of the stream. - -For example, consider a pipelined aggregation across two processes. We can sum up elements on the first process in a batched manner using `tick_batch()`, then sum up the results on the second process in an unbounded manner using `all_ticks()`: - -```rust -let root_stream = process.source_stream(q!(1..=10)); -root_stream - .tick_batch() - .fold(q!(|| 0), q!(|acc, x| *acc += x)) - .send_bincode(&process2); - .all_ticks() - .fold(q!(|| 0), q!(|acc, x| acc + x)); -``` - -Note that Hydroflow+ streams are still unbounded! The `all_ticks()` operator does not wait until all elements are received, instead on each tick it will operate over all data since the beginning of the stream. - -So if we were to pass in inputs 1, 2, 3, we might get the following results on process 2: -``` -First Tick: 0 -Second Tick: (1 + 2) = 3 -Third Tick: (1 + 2) + (3) = 6 -``` diff --git a/docs/docs/hydroflow_plus/clusters.mdx b/docs/docs/hydroflow_plus/clusters.mdx deleted file mode 100644 index 5d742324c673..000000000000 --- a/docs/docs/hydroflow_plus/clusters.mdx +++ /dev/null @@ -1,89 +0,0 @@ ---- -sidebar_position: 3 ---- - -# Clusters -A key restriction of processes in Hydroflow+ is that there can only be one instance of the computation assigned to each process across the entire distributed system. This is fine for simple applications with only pipelined computation, but for scaling out we need the ability to have _multiple instances of the same computation_ running in parallel. - -Clusters solve this by providing an nearly-identical API to processes, but representing a **set of instances** running the same computation instead of a single one. What changes when using a cluster is sending data, since we need to specify _which_ instance(s) of the computation to send the data to. - -## Computing on Clusters -Instantiating clusters is done using the `cluster` method on `FlowBuilder`, taking a `ClusterSpec`: -```rust -pub fn my_flow<'a, D: Deploy<'a>>( - flow: &FlowBuilder<'a, D>, - cluster_spec: &impl ClusterSpec<'a, D> -) { - let cluster = flow.cluster(cluster_spec); -} -``` - -This API follows the same pattern as processes, where a cluster spec represents a _template_ for a cluster, which can be instantiated multiple times to create multiple clusters. - -Instantiating streams on clusters uses the same APIs as streams: `source_iter` and `source_stream` are both available. But when using these APIs, the root streams will be instantiated on _all_ instances in the cluster. - -```rust -let stream = cluster.source_iter(q!(vec![1, 2, 3])); - -stream.for_each(q!(|x| println!("{}", x))) -// will print 1, 2, 3 on **each** instance -``` - -## Sending Data -Because clusters represent a set of instances, adding networking requires us to specify _which_ instance(s) to send data to. Clusters provide different types depending on if the source or receiver is a cluster or a process. - -Elements in a cluster are identified by a **cluster ID** (a `ClusterId` where `C` is the typetag of the cluster). To get the IDs of all instances in a cluster, use the `members` method on cluster, which returns a runtime expression of type `&Vec>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector. - -This can then be passed into `source_iter` to load the IDs into the graph. -```rust -let stream = process.source_iter(cluster.members()).cloned(); -``` - -### One-to-Many -When sending data from a process to a cluster, the source must be a stream of tuples of the form `(ClusterId<_>, T)` and sends each `T` element to the instance with the matching ID. - -This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number: -```rust -let cluster_ids = cluster.members(); -let stream = process.source_iter(q!(vec![123, 456, 789])) - .enumerate() - .map(q!(|(i, x)| ( - ClusterId::from_raw(i % cluster_ids.len() as u32), - x - ))) - .send_bincode(&cluster); -``` - -To broadcast data to all instances in a cluster, use `broadcast_{bincode,bytes}`, which acts as a shortcut for the cross product. - -```rust -let stream = process.source_iter(q!(vec![123, 456, 789])) - .broadcast_bincode(&cluster); -``` - -### Many-to-One -In the other direction, sending data from a cluster to a process, we have a stream of elements of type `T` at the sender but on the recipient side we get a stream of tuples of the form `(u32, T)`, where the `u32` is the ID of the instance that sent the element. The elements received from different instances will be interleaved. - -This is useful for aggregating data from multiple instances into a single stream. For example, we can use `send_bincode` to send data from all instances to a single process, and then print them all out: -```rust -let stream = cluster.source_iter(q!(vec![123, 456, 789])) - .send_bincode(&process) - .for_each(q!(|(id, x)| println!("{}: {}", id, x))); -``` - -If you don't care which instance sent the data, you can use `send_{bincode,bytes}_interleaved`, where the recipient receives a stream of `T` elements, but the elements received from different instances will be interleaved. -```rust -let stream = cluster.source_iter(q!(vec![123, 456, 789])) - .send_bincode_interleaved(&process) - .for_each(q!(|x| println!("{}", x))); -``` - -### Many-to-Many -Finally, when sending data from one cluster to another (or to itself as in distributed protocols), the source emits tuples of the form `(u32, T)` and sends each `T` element to the instance with the matching `u32` ID, but the recipient also gets `(u32, T)` tuples with the ID of the sender. - -We can use the same shortcuts as before. For example, we can use `broadcast_bincode_interleaved` to send data from all instances in a cluster to all instances in another cluster, and then print them all out: -```rust -let stream = cluster1.source_iter(q!(vec![123, 456, 789])) - .broadcast_bincode_interleaved(&cluster2) - .for_each(q!(|x| println!("{}", x))); -``` diff --git a/docs/docs/hydroflow_plus/cycles.mdx b/docs/docs/hydroflow_plus/cycles.mdx deleted file mode 100644 index 21d1b78fcf52..000000000000 --- a/docs/docs/hydroflow_plus/cycles.mdx +++ /dev/null @@ -1,32 +0,0 @@ ---- -sidebar_position: 5 ---- - -# Cycles -Hydroflow+ supports cyclic graphs, which are useful for iterative computations or patterns like heartbeats. - -Because streams are represented as values when constructing a Hydroflow+ graph, we can't directly create cycles since that would require a forward reference. Instead, Hydroflow+ offers an API to create a cycle by using a _placeholder_ stream, which is a stream that can be used as a placeholder for a stream that will be created later. - -We can create a cycle by using the `cycle` method on flow with a process or cluster. This returns a tuple of two values: a `HfCycle` value that can be used to complete the cycle later and the placeholder stream. - -```rust -let (complete_cycle, cycle_placeholder) = process.cycle(); -``` - -For example, consider the classic graph reachability problem, which computes the nodes reachable from a given set of roots in a directed graph. This can be modeled as an iterative fixpoint computation where we start with the roots, then repeatedly add the children of each node to the set of reachable nodes until we reach a fixpoint. - -In Hydroflow+, we can implement this using cycles: - -```rust -let roots = process.source_stream(roots); -let edges = process.source_stream(edges); - -let (complete_reached_nodes, reached_nodes) = process.cycle(); - -let reach_iteration = roots - .union(&reached_nodes) - .map(q!(|r| (r, ()))) - .join(&edges) - .map(q!(|(_from, (_, to))| to)); -complete_reached_nodes.complete(&reach_iteration); -``` diff --git a/docs/docs/hydroflow_plus/dataflow-programming.mdx b/docs/docs/hydroflow_plus/dataflow-programming.mdx new file mode 100644 index 000000000000..f88b2d389233 --- /dev/null +++ b/docs/docs/hydroflow_plus/dataflow-programming.mdx @@ -0,0 +1,13 @@ +--- +sidebar_position: 1 +--- + +# Dataflow Programming +Hydroflow+ uses a dataflow programming model, which will be familiar if you have used APIs like Rust iterators. Instead of using RPCs or async/await to describe distributed computation, Hydroflow+ instead uses **asynchronous streams**, which represent data arriving over time. Streams can represent a series of asynchronous events (e.g. inbound network requests) or a sequence of data items. + +Programs in Hydroflow+ describe how to **transform** entire collections of data using operators such as `map` (transforming elements one by one), `fold` (aggregating elements into a single value), or `join` (combining elements from multiple streams on matching keys). + +If you are familiar with Spark, Flink or Pandas, you will find Hydroflow+ syntax familiar. However, note well that the semantics for asynchronous streams in Hydroflow+ differ significantly from bulk analytics systems like those above. In particular, Hydroflow+ uses the type system to distinguish between bounded streams (originating from finite data) and unbounded streams (originated from asynchronous input). Moreover, Hydroflow+ is designed to handle asynchronous streams of small, independent events very efficiently. + + + diff --git a/docs/docs/hydroflow_plus/index.mdx b/docs/docs/hydroflow_plus/index.mdx index 77a9ee57362c..c2ade3d3f6f3 100644 --- a/docs/docs/hydroflow_plus/index.mdx +++ b/docs/docs/hydroflow_plus/index.mdx @@ -3,11 +3,11 @@ sidebar_position: 0 --- # Introduction -Hydroflow+ is a high-level distributed streaming framework for Rust powered by the [Hydroflow runtime](../hydroflow/index.mdx). Unlike traditional architectures such as actors or RPCs, Hydroflow+ offers _choreographic_ APIs, where expressions and functions can describe computation that takes place across many locations. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs to the cloud. +Hydroflow+ is a high-level distributed programming framework for Rust powered by the [Hydroflow runtime](../hydroflow/index.mdx). Unlike traditional architectures such as actors or RPCs, Hydroflow+ offers _choreographic_ APIs, where expressions and functions can describe computation that takes place across many locations. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs to the cloud. Hydroflow+ uses a two-stage compilation approach. HF+ programs are standard Rust programs, which first run on the developer's laptop to generate a _deployment plan_. This plan is then compiled to individual binaries for each machine in the distributed system (enabling zero-overhead abstractions), and are then deployed to the cloud using the generated plan along with specifications of cloud resources. -Hydroflow+ has been used to write a variety of high-performance distributed system, including implementations of classic distributed protocols such as two-phase commit and Paxos. Work is ongoing to develop a distributed systems standard library that will offer these protocols and more as reusable components. +Hydroflow+ has been used to write a variety of high-performance distributed systems, including implementations of classic distributed protocols such as two-phase commit and Paxos. Work is ongoing to develop a distributed systems standard library that will offer these protocols and more as reusable components. :::caution diff --git a/docs/docs/hydroflow_plus/process_streams.mdx b/docs/docs/hydroflow_plus/process_streams.mdx deleted file mode 100644 index 75559052b43a..000000000000 --- a/docs/docs/hydroflow_plus/process_streams.mdx +++ /dev/null @@ -1,78 +0,0 @@ ---- -sidebar_position: 2 ---- - -# Processes and Streams -Hydroflow+ involves two main concepts: -- **Processes**, which represent _where_ elements of a dataflow program are processed -- **Streams**, which define _what_ is being computed - -By combining the two, Hydroflow+ makes it possible to implement both low-level distributed protocols and high-level dataflow programs using the same API, all while supporting compile-time checks to guard against unexpected sources of nondeterminism. - -## Processes -Unlike most streaming systems, Hydroflow+ requires that all streams be associated with a particular **process**. A process is a logical unit of computation that can be deployed to a single machine. Processes are most closely related to _actors_ in actor-based systems, but use streaming operators rather than an imperative API. - -To create a process, we must take a `ProcessSpec` as an argument to our function. This trait abstracts over what the dataflow graph is being built for: compilation to a Rust binary or deployment. - -```rust -pub fn my_flow<'a, D: Deploy<'a>>( - flow: &FlowBuilder<'a, D>, - process_spec: &impl ProcessSpec<'a, D> -) { - ... -} -``` - -Process specs represent a _template_ for a process, which can be instantiated multiple times to create multiple processes. Multiple process specs can be useful to specify deployment characteristics for different sets of processes, such as deploying them to different cloud providers or regions. - -Instantiating a process from a process spec is done using the `process` method on `FlowBuilder`: - -```rust -let process = flow.process(process_spec); -``` - -## Streams -Streams are infinite ordered sequences of elements. They can be transformed using functional operators such as `map` and `filter`, relational operators such as `join`, and can be connected across processes using `send_to`. - -### Instantiating Streams -Root streams are created using methods available on an an instantiated process. - -#### `source_iter` -To create a stream from a Rust iterator, use `source_iter`. This is useful for loading static data into the graph. Each element of the iterator will be emitted _exactly once_ in the _first tick_ of execution (see [Aggregations and Ticks](./aggregations.mdx)). - -```rust -let stream = process.source_iter(q!(vec![1, 2, 3])); -``` - -#### `source_stream` -To create a stream from an asynchronous source, use `source_stream`. This takes any type that implements `futures::Stream` and emits each element as it is received. This is useful for loading data from external sources such as Kafka or a database. Typically, you will want to take the stream as a `RuntimeData` parameter to your function, and pass the stream in your runtime binary. - -```rust -pub fn my_flow<'a, D: Deploy<'a>>( - ..., - my_stream: RuntimeData> -) { - let stream = process.source_stream(my_stream); - ... -} -``` - -### Sending Streams between Processes -To send a stream from one process to another, use the `send_*` methods on the source stream. This takes a parameter of the process to send the data to. - -If sending a type that supports serialization using `serde`, use `send_bincode`, which uses the `bincode` crate to serialize the data. - -```rust -let process0 = flow.process(process_spec); -let process1 = flow.process(process_spec); - -let stream0 = process0.source_iter(...); -let stream1 = stream0.send_bincode(&process1); -``` - -To use custom serializers, you can use the `send_bytes` method to send a stream of `Bytes` values. - -```rust -let stream0 = process0.source_iter(...); -let stream1 = stream0.send_bytes(&process1); -``` diff --git a/docs/docs/hydroflow_plus/quickstart/_category_.json b/docs/docs/hydroflow_plus/quickstart/_category_.json index db70db116681..20b00e5064e9 100644 --- a/docs/docs/hydroflow_plus/quickstart/_category_.json +++ b/docs/docs/hydroflow_plus/quickstart/_category_.json @@ -1,6 +1,6 @@ { "label": "Quickstart", - "position": 1, + "position": 2, "link": { "type": "doc", "id": "hydroflow_plus/quickstart/index" diff --git a/docs/docs/hydroflow_plus/quickstart/clusters.mdx b/docs/docs/hydroflow_plus/quickstart/clusters.mdx index 1d234d5b43cb..85ccee8c57df 100644 --- a/docs/docs/hydroflow_plus/quickstart/clusters.mdx +++ b/docs/docs/hydroflow_plus/quickstart/clusters.mdx @@ -4,56 +4,59 @@ sidebar_position: 3 import CodeBlock from '@theme/CodeBlock'; import firstTenClusterSrc from '!!raw-loader!../../../../template/hydroflow_plus/src/first_ten_cluster.rs'; import firstTenClusterExample from '!!raw-loader!../../../../template/hydroflow_plus/examples/first_ten_cluster.rs'; -import { getLines, extractOutput } from '../../../src/util'; +import { getLines, highlightLines, extractOutput } from '../../../src/util'; # Scaling with Clusters -So far, we have looked at distributed systems where there is a single process running each piece of the compute graph -- **compute parallelism** (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple processes -- achieving **data parallelism** (like replication and partitioning). This is done by creating a **cluster** of processes that all run the same subgraph. +So far, we have looked at distributed systems where each process is running a different piece of the compute graph -- **compute parallelism**. However, we can also use Hydroflow+ to run the same computation on multiple processes -- achieving **data parallelism** (e.g. partitioning). This is done by creating a **cluster** of processes that all run the same subgraph of code. ## Dataflow with Clusters -Just like we use the `Process` type to represent a virtual handle to a single node, we can use the `Cluster` type to represent a handle to a **set of nodes** (with size unknown at compile-time). +Just like we use the `Process` type to represent a virtual handle to a single node, we can use the **`Cluster`** type to represent a handle to a **set of nodes** (with size unknown at compile-time). -A `Stream` materialized on a `Cluster` can be thought of as SIMD-style programming, where the stream represents many independent streams on each member of the cluster, and each transformation of the stream performs the transformation on each cluster member. +A `Stream` located on a `Cluster` can be thought of as SIMD-style programming, where each cluster member executes the same operators but on different pieces of data. -To start, we set up a new module in `first_ten_cluster.rs` with a dataflow program that takes in a `Process` for a leader and `Cluster` for a set of workers. +To start, we set up a new module in `src/first_ten_cluster.rs` with a dataflow program that takes in a `Process` for a leader and `Cluster` for a set of workers. {getLines(firstTenClusterSrc, 1, 6)} -We start by materializing a stream of numbers on the `leader`, as before. But rather than sending the stream to a single process, we will instead _distribute_ the data to each member of the cluster using `round_robin_bincode`. This API sends data to a `cluster` in a round-robin fashion by using the order of elements to determine which cluster member the element is sent to. +We start by materializing a stream of numbers on the `leader`, as before. But rather than sending the stream to a single process, we will instead _distribute_ the data to each member of the cluster using `round_robin_bincode`. This API places data on a `cluster` in a round-robin fashion by using the order of elements to determine which cluster member each element is sent to. :::info -There are a variety of APIs for sending data to and reciving data from clusters. For example, we can `broadcast_bincode` to send copies to all members, or use the existing `send_bincode` if we have a custom algorithm to determine which cluster member should receive a piece of data. +There are a [variety of APIs](pathname:///rustdoc/hydroflow_plus/stream/struct.Stream.html#impl-Stream%3CT,+L,+B%3E-2) for sending data to and receiving data from clusters. For example, we `broadcast_bincode` to send copies to all members (e.g. for replication), or use `send_bincode` if we have a custom partitioning algorithm. ::: {getLines(firstTenClusterSrc, 7, 9)} -On each cluster member, we will then do some work to transform the data (using `map`) and log out the transformed values locally (using `inspect`, which is useful for debugging logic). +On each cluster member, we will then do some work to transform the data (using `map`) and log the transformed values locally (using `inspect`, which is useful for debugging). {getLines(firstTenClusterSrc, 10, 11)} -Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. This is similar to `send_bincode` in that the elements are sent to the leader process, but the elements from different cluster members are mixed together into a single stream with the same element type as the sender side (regular `send_bincode` would result in a stream of (cluster ID, data) tuples). +Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. If we used `send_bincode`, we would get a stream of (cluster ID, data) tuples. Since it is a common pattern to ignore the IDs, `send_bincode_interleaved` is available as a helper. {getLines(firstTenClusterSrc, 12, 14)} ## Deploying Clusters -Deployment scripts are similar to before, except that when provisioning a cluster we provide a list of deployment hosts rather than a single one. In our example, we'll launch 4 nodes for the cluster. +Deployment scripts are similar to before, except that when provisioning a cluster we provide a list of deployment hosts rather than a single one. In our example, we'll launch 4 nodes for the cluster by creating a `Vec` of 4 localhost instances. -{firstTenClusterExample} +{highlightLines(firstTenClusterExample, [14])} We can then launch the program: ```bash #shell-command-next-line cargo run --example first_ten_cluster +#highlight-next-line [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 0 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 2] 4 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 2] 12 +#highlight-next-line [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 8 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 3] 6 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 2 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 10 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 18 [hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 0 +#highlight-next-line [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 16 [hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 3] 14 [hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 8 @@ -67,4 +70,4 @@ cargo run --example first_ten_cluster [hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 14 ``` -You'll notice the round-robin distribution in action here, as each cluster log is tagged with the ID of the member (e.g. `/ 0`). In our deployment, we are sending data round-robin across 4 members of the cluster, numbered `0` through `3`. Hence cluster member `0` receives values `0`, `4`, `8`, member `1` receives values `1`, `5`, `9`, and so on. +You'll notice the round-robin distribution in action here, as each cluster log is tagged with the ID of the member (e.g. `/ 0`). In our deployment, we are sending data round-robin across 4 members of the cluster, numbered `0` through `3`. Hence cluster member `0` receives values `0`, `4`, `8` (corresponding to the highlighted lines), member `1` receives values `1`, `5`, `9`, and so on. diff --git a/docs/docs/hydroflow_plus/quickstart/distributed.mdx b/docs/docs/hydroflow_plus/quickstart/distributed.mdx index d6c5dd27c7f7..4b958cd52c86 100644 --- a/docs/docs/hydroflow_plus/quickstart/distributed.mdx +++ b/docs/docs/hydroflow_plus/quickstart/distributed.mdx @@ -7,9 +7,10 @@ import firstTenDistExample from '!!raw-loader!../../../../template/hydroflow_plu import { getLines, extractOutput } from '../../../src/util'; # Adding Distribution -Continuing from our previous example, we will now look at how to deploy our program to run on multiple processes. First, we need to extend our dataflow program to use multiple processes with a network between them. +Continuing from our previous example, we will now look at how to deploy our program to run on two processes. + +We'll start by updating our dataflow function signature to take two processes (in a new file, `src/first_ten_distributed.rs`). At this point, we'll need to add a lifetime parameter `'a` which represents the lifetime of data referenced by our dataflow logic. This lifetime needs to be the same across all the processes, so it can't be elided. -We'll start by updating our function signature to take two processes. At this point, we'll need to add a lifetime parameter `'a` which represents the lifetime of data referenced by our dataflow logic. This lifetime needs to be the same across all the processes, so it can't be elided. ```rust title="src/first_ten_distributed.rs" use hydroflow_plus::*; @@ -17,6 +18,12 @@ use hydroflow_plus::*; pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) ``` +:::info + +The Hydroflow+ template only contains the final version of this program. In order to follow along with the tutorial, we recommend overwriting `src/first_ten_distributed.rs` according to the following snippets. + +::: + Now, we'll use a new API, `send_bincode` to establish a network between our processes (`bincode` is the serialization format we are using). Given a stream on process `p1`, we can send the data to `p2` by calling `.send_bincode(p2)`, which returns a stream on `p2`. So to make our program distributed, it only takes a single line change. ```rust title="src/first_ten_distributed.rs" @@ -28,7 +35,7 @@ pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) { } ``` -Then, we can update our deployment script to launch both processes on localhost. Hydro Deploy will automatically handle service discovery and networking, since it knows the full network topology (on UNIX systems, this will use a UNIX socket for networking). +Then, we can update our deployment script to launch both processes on localhost. Hydro Deploy will automatically handle service discovery and networking, since it knows the full network topology. {firstTenDistExample} @@ -54,6 +61,12 @@ To fix this, we can use the optional type parameter on `Process`, which lets us {getLines(firstTenDistSrc, 3, 10)} +:::info + +This is the final version of our dataflow which you will find in the Hydroflow+ template. + +::: + If you are using an IDE extension like [Rust Analyzer](https://rust-analyzer.github.io/), you'll see these types attached to each stream. And if we launch the program again, we'll see much better logs: ```bash diff --git a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx index f9c54d8985c5..75ca4a9f5552 100644 --- a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx +++ b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx @@ -24,23 +24,37 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus ::: ## Writing a Dataflow +In Hydroflow+, streams are attached to a **location**, which is either a virtual handle to a **single machine** (the **`Process`** type) or **set of machines** (the **`Cluster`** type). A single piece of Hydroflow+ code can describe a distributed program that runs across multiple processes and clusters, each with their own local state and data. -In Hydroflow+, streams are attached to a **`Location`**, which is a virtual handle to a **single machine** (the `Process` type) or **set of machines** (the `Cluster` type). To write distributed programs, a single piece of code can use multiple locations. - -Our first dataflow will run on a single machine, so we take a `&Process` parameter. We can materialize a stream on this machine using `process.source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. +We'll write our first dataflow in `src/first_ten.rs`. This program will run on a single machine, so we take a single `&Process` parameter. We can materialize a stream on this machine using `process.source_iter` (which emits values from a static in-memory collection), and then print out the values using `for_each`. {firstTenSrc} -You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them. +:::caution + +You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. The top-level Hydroflow+ program (`first_ten`) is responsible for setting up the dataflow structure, whereas the `q!` macro is used to mark the Rust code that will be executed at **runtime**. Generally, runtime code in a `q!` macro is a snippet of Rust code that defines a static source of data or a closure. + +If you forget to wrap a block in `q!` when that is required, you'll see an error like: +``` +closure is expected to take 5 arguments, but it takes X arguments +``` + +::: ## Running the Dataflow -Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment (generally, we will place deployment scripts in `examples` because Hydro Deploy is a dev dependency). +To run a Hydroflow+ program, we need to write some deployment configuration in `examples/first_ten.rs`. + +:::tip + +When using Hydroflow+, we will *always* place our deployment scripts in the `examples` directory. This is required because deployment is done via [Hydro Deploy](../../deploy/index.md) which is a _dev dependency_---i.e. not part of the dependencies used for generating binaries (but available to programs in the `examples` directory). + +::: {firstTenExampleSrc} First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation. -To create a `Process`, we call `flow.process()`. After the dataflow has been created, we must map each instantiated `Process` to a deployment target using `flow.with_process` (in this case we deploy to localhost). +To create a `Process`, we call `flow.process()`. After the dataflow has been created (by invoking the `hydroflow_plus_template::first_ten::first_ten` function we created earlier), we must map each instantiated `Process` to a deployment target using `flow.with_process` (in this case we deploy to localhost). Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`. diff --git a/docs/docs/hydroflow_plus/quickstart/index.mdx b/docs/docs/hydroflow_plus/quickstart/index.mdx index d5e8a071ffaa..c53a66903f61 100644 --- a/docs/docs/hydroflow_plus/quickstart/index.mdx +++ b/docs/docs/hydroflow_plus/quickstart/index.mdx @@ -1,34 +1,21 @@ # Quickstart In this tutorial, we'll walk through the basics of Hydroflow+ by building a simple dataflow that prints out the first 10 natural numbers. We'll start with a single process, then pipeline the computation, and finally distribute it across a cluster. -## Installing Rust +:::tip First you will need to install Rust. We recommend the conventional installation -method, `rustup`, which allows you to easily manage and update Rust versions. - -[**Install Rust**](https://www.rust-lang.org/tools/install) +method, `rustup`, which allows you to easily manage and update Rust versions: [**Install Rust**](https://www.rust-lang.org/tools/install) The link in the previous line will take you to the Rust website that shows you how to install `rustup` and the Rust package manager `cargo` (and the internally-used `rustc` compiler). `cargo` is Rust's main development tool, used for building, running, and testing Rust code. -The following `cargo` commands will come in handy: -* `cargo check --all-targets` - Checks the workspace for any compile-time - errors. -* `cargo build --all-targets` - Builds all projects/tests/benchmarks/examples - in the workspace. -* `cargo clean` - Cleans the build cache, sometimes needed if the build is - acting up. -* `cargo test` - Runs tests in the workspace. -* `cargo run -p hydroflow --example ` - Run an example program in - `hydroflow/examples`. - -## VS Code Setup - We recommend using VS Code with the `rust-analyzer` extension (and NOT the `Rust` extension). +::: + ## Getting Started with Hydroflow+ To get started with a new project, we'll use the Hydroflow+ template. The template comes with a simple distributed program. @@ -46,4 +33,4 @@ After `cd`ing into the generated folder, we can run tests for the included sampl ```bash #shell-command-next-line cargo test -``` \ No newline at end of file +``` diff --git a/docs/docs/hydroflow_plus/stageleft.mdx b/docs/docs/hydroflow_plus/stageleft.mdx index f2a4ef062ce2..e4ac5014c9e8 100644 --- a/docs/docs/hydroflow_plus/stageleft.mdx +++ b/docs/docs/hydroflow_plus/stageleft.mdx @@ -1,8 +1,10 @@ --- title: Stageleft -sidebar_position: 6 +sidebar_position: 3 --- import StageleftDocs from '../../../stageleft/README.md' +Under the hood, Hydroflow+ uses a library called Stageleft to power the `q!` macro and code generation logic. The following docs, from the Stageleft README, outline the core architecture of Stageleft. + diff --git a/docs/src/pages/index.js b/docs/src/pages/index.js index c55173a9b8f3..7b89d377e3bb 100644 --- a/docs/src/pages/index.js +++ b/docs/src/pages/index.js @@ -26,7 +26,7 @@ export default function Home() { justifyContent: "center", flexWrap: "wrap" }}> - { + if (lines.includes(i + 1)) { + return `// highlight-next-line\n${line}`; + } + return line; + }).join('\n'); +} + /// Extract the output from the stdout snapshots created by `surface_examples.rs`. /// /// This hides the graph output. Use `extractMermaid` to extract the graph output.