Skip to content

Commit

Permalink
Merge branch 'main' into zp/pg_is_in_recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jul 10, 2024
2 parents db60f7b + 0987e27 commit e961c29
Show file tree
Hide file tree
Showing 94 changed files with 897 additions and 717 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
hooks:
- id: typos
- repo: local
hooks:
- id: rustfmt
name: rustfmt
entry: rustfmt --edition 2021
language: system
types: [rust]
- id: typos
name: typos
entry: typos -w
language: system
- id: cargo sort
name: cargo sort
entry: cargo sort -g -w
Expand Down
24 changes: 2 additions & 22 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,5 @@

This directory contains RisingWave design documents that are intended to be used by contributors to understand our development process, and how we design and implement RisingWave. To learn about how to use RisingWave, check out the [RisingWave user documentation](https://www.risingwave.dev).

## Developer guide

After you learn about the basics of RisingWave, take a look at our [developer guide](https://risingwavelabs.github.io/risingwave/) to get up to speed with the development process.

## Table of Contents

* [Architecture Design](./architecture-design.md)
* [An Overview of RisingWave Streaming Engine](./streaming-overview.md)
* [An Overview of RisingWave State Store](./state-store-overview.md)
* [Meta Service](./meta-service.md)
* [Create MView on Top of MView](./mv-on-mv.md)
* [Checkpoint](./checkpoint.md)
* [Design of Data Source](./data-source.md)
* [Data Model and Encoding](./data-model-and-encoding.md)
* [Design of Batch Local Execution Mode](./batch-local-execution-mode.md)
* [Consistent Hash](./consistent-hash.md)
* [Build RisingWave with Multiple Object Storage Backends](./multi-object-store.md)
* [Backfill](./backfill.md)

## Images

We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
- `/dev` contains the source code for the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)
- `/rustdoc` contains source code for the [crate level documentation](https://risingwavelabs.github.io/risingwave/rustdoc)
4 changes: 4 additions & 0 deletions docs/dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ including the `<!-- toc -->` marker at the place where you want the TOC.

We use `mdbook-linkcheck` to validate URLs included in our documentation.
`linkcheck` will be run automatically when you build with the instructions in the section above.

## Images

We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
25 changes: 24 additions & 1 deletion docs/dev/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
- [Testing](./tests/intro.md)
- [Debugging](./debugging.md)
- [Observability](./observability.md)
- [Metrics](./metrics.md)

---

# Benchmarking and Profiling
# Benchmarking and profiling

- [CPU Profiling](./benchmark-and-profile/cpu-profiling.md)
- [Memory (Heap) Profiling](./benchmark-and-profile/memory-profiling.md)
Expand All @@ -27,6 +28,28 @@
- [Develop Connectors](./connector/intro.md)
- [Continuous Integration](./ci.md)

---

# Design docs

<!-- TODO: perhapts we need to reorder/group these docs? -->

- [Architecture Design](./design/architecture-design.md)
- [An Overview of RisingWave Streaming Engine](./design/streaming-overview.md)
- [An Overview of RisingWave State Store](./design/state-store-overview.md)
- [Meta Service](./design/meta-service.md)
- [Create MView on Top of MView](./design/mv-on-mv.md)
- [Checkpoint](./design/checkpoint.md)
- [Design of Data Source](./design/data-source.md)
- [Data Model and Encoding](./design/data-model-and-encoding.md)
- [Design of Batch Local Execution Mode](./design/batch-local-execution-mode.md)
- [Consistent Hash](./design/consistent-hash.md)
- [Build RisingWave with Multiple Object Storage Backends](./design/multi-object-store.md)
- [Backfill](./design/backfill.md)
- [Aggregation](./design/aggregation.md)
- [Shared Buffer](./design/shared-buffer.md)
- [Relational Table](./design/relational-table.md)
- [Keys](./design/keys.md)
<!--
TODO:
Expand Down
6 changes: 3 additions & 3 deletions docs/aggregation.md → docs/dev/src/design/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TODO

## HashAggExecutor

![aggregation components](./images/aggregation/agg-components.png)
![aggregation components](../images/aggregation/agg-components.png)

Within the `HashAggExecutor`, there are 4 main components:
1. AggCalls.
Expand All @@ -40,10 +40,10 @@ For each of these aggregations, they have 1 state table (`AggStateStorage::Mater

### Initialization of `AggGroups`

![init-agg-group](./images/aggregation/init-agg-group.png)
![init-agg-group](../images/aggregation/init-agg-group.png)

AggGroups are initialized when corresponding aggregation groups are not found in `AggGroupCache`.
This could be either because the `AggGroupCache` got evicted,
or its a new group key.

It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ There are currently 4 types of nodes in the cluster:
* **HummockManager**: Manages the SST file manifest and meta-info of Hummock storage.
* **CompactionManager**: Manages the compaction status and task assignment of Hummock storage.

![Architecture](./images/architecture-design/architecture.svg)
![Architecture](../images/architecture-design/architecture.svg)

The topmost component is the Postgres client. It issues queries through [TCP-based Postgres wire protocol](https://www.postgresql.org/docs/current/protocol.html).

Expand All @@ -41,13 +41,13 @@ Let's begin with a simple SELECT and see how it is executed.
SELECT SUM(t.quantity) FROM t group by t.company;
```

![Batch-Query](./images/architecture-design/batch-query.svg)
![Batch-Query](../images/architecture-design/batch-query.svg)

The query will be sliced into multiple *plan fragments*, each being an independent scheduling unit and probably with different parallelism. For simplicity, parallelism is usually set to the number of CPU cores in the cluster. For example, if there are 3 compute-nodes in the cluster, each with 4 CPU cores, then the parallelism will be set to 12 by default.

Each parallel unit is called a *task*. Specifically, PlanFragment 2 will be distributed as 4 tasks to 4 CPU cores.

![Plan-Fragments](./images/architecture-design/plan-fragments.svg)
![Plan-Fragments](../images/architecture-design/plan-fragments.svg)

Behind the TableScan operator, there's a storage engine called Hummock that stores the internal states, materialized views, and the tables. Note that only the materialized views and tables are queryable. The internal states are invisible to users.

Expand All @@ -62,7 +62,7 @@ For example:
CREATE MATERIALIZED VIEW mv1 AS SELECT SUM(t.quantity) as q FROM t group by t.company;
```

![Stream-Pipeline](./images/architecture-design/stream-pipeline.png)
![Stream-Pipeline](../images/architecture-design/stream-pipeline.png)

When the data source (Kafka, e.g.) propagates a bunch of records into the system, the materialized view will refresh automatically.

Expand Down
16 changes: 8 additions & 8 deletions docs/backfill.md → docs/dev/src/design/backfill.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ Notice how `mv2` only needs the `id` column from `mv1`, and not the full `pk` wi

#### Overview

![backfill sides](./images/backfill/backfill-sides.png)
![backfill sides](../images/backfill/backfill-sides.png)

For `ArrangementBackfill`, we have 2 streams which we merge:
upstream and historical streams.
Expand All @@ -230,7 +230,7 @@ to make sure `Barriers` can flow through the stream graph.
Additionally, every epoch, we will refresh the historical stream, as upstream data gets checkpointed
so our snapshot is stale.

![polling](./images/backfill/polling.png)
![polling](../images/backfill/polling.png)

We will poll from this stream in backfill to get upstream and historical data chunks for processing,
as well as barriers to checkpoint to backfill state.
Expand All @@ -239,7 +239,7 @@ For each chunk (DataChunk / StreamChunk), we may also need to do some further pr

#### Schemas

![schema](./images/backfill/schema.png)
![schema](../images/backfill/schema.png)

There are 3 schemas to consider when processing the backfill data:
1. The state table schema of upstream.
Expand All @@ -258,7 +258,7 @@ to ensure the historical side and the upstream side have a consistent schema.

#### Polling loop

![handle_poll](./images/backfill/handle-poll.png)
![handle_poll](../images/backfill/handle-poll.png)

If we poll a chunk from the historical side, we will yield it to the downstream,
and update the primary key (pk) we have backfilled to in the backfill state.
Expand All @@ -278,7 +278,7 @@ Then the polling loop will continue.

### Replication

![replication_simple](./images/backfill/replication-simple.png)
![replication_simple](../images/backfill/replication-simple.png)

Previously, when doing snapshot reads to read **Historical Data**, backfill executor is able to read
from the shared buffer for the previous epoch.
Expand All @@ -288,7 +288,7 @@ However, with `ArrangementBackfill`,
we can't rely on the shared buffer of upstream,
since it can be on a different parallel unit.

![replication_replicated](./images/backfill/replication-replicated.png)
![replication_replicated](../images/backfill/replication-replicated.png)

So we need to make sure for the previous epoch, we buffer
its updates somewhere to replicate the shared buffer.
Expand All @@ -314,7 +314,7 @@ to ensure the historical side and the upstream side have a consistent schema.
Where (1) refers to the state table schema of upstream,
and (2) refers to the output schema from upstream to arrangement backfill.

![replication_example](./images/backfill/replication-example.png)
![replication_example](../images/backfill/replication-example.png)

Now let's consider an instance where (1) has the schema:

Expand Down Expand Up @@ -393,4 +393,4 @@ TODO

## Source Backfill

TODO
TODO
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ requirement of point queries, and in this article we introduce local execution m

## Design

![Frontend Flow](./images/batch-local-execution-mode/frontend-flow.svg)
![Frontend Flow](../images/batch-local-execution-mode/frontend-flow.svg)

## Example 1: select a from t where b in (1, 2, 3, 4)

Let's use the above SQL as an example:


![Example 1](./images/batch-local-execution-mode/example1.svg)
![Example 1](../images/batch-local-execution-mode/example1.svg)

The key changes from the distributed mode:

Expand All @@ -36,7 +36,7 @@ The key changes from the distributed mode:
Following is the plan and execution of above sql in local mode:


![Example 2](./images/batch-local-execution-mode/example2.svg)
![Example 2](../images/batch-local-execution-mode/example2.svg)

As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.

Expand Down
4 changes: 2 additions & 2 deletions docs/checkpoint.md → docs/dev/src/design/checkpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The consistent checkpoints play 2 roles in our system.

RisingWave makes checkpointing via [Chandy–Lamport algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm). A special kind of message, checkpoint barriers, is generated on streaming source and propagates across the streaming graph to the materialized views (or sink).

![](./images/checkpoint/checkpoint.svg)
![](../images/checkpoint/checkpoint.svg)

To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every `barrier_interval_ms`) repeats the following procedure:
Expand All @@ -39,6 +39,6 @@ As is mentioned before, during checkpointing, every operator writes their change

A local shared buffer is introduced to stage these uncommitted write batches. Once the checkpoint barriers have pass through all actors, the storage manager can notify all compute nodes to 'commit' their buffered write batches into the shared storage.

![shared buffer](./images/checkpoint/shared-buffer.svg)
![shared buffer](../images/checkpoint/shared-buffer.svg)

Another benefit of shared buffer is that the write batches in a compute node can be compacted into a single SSTable file before uploading, which significantly reduces the number of SSTable files in Layer 0.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Here comes the main part, where we will construct a mapping that determines data

For all data $k \in U_k$, where $U_k$ is an unbounded set, we apply a hash function $v = H(k)$, where $v$ falls to a limited range. The hash function $H$ ensures that all data are hashed **uniformly** to that range. We call $v$ vnode, namely virtual node, as is shown as the squares in the figure below.

![initial data distribution](./images/consistent-hash/data-distribution.svg)
![initial data distribution](../images/consistent-hash/data-distribution.svg)

Then we have vnode mapping, which ensures that vnodes are mapped evenly to parallel units in the cluster. In other words, the number of vnodes that are mapped to each parallel unit should be as close as possible. This is denoted by different colors in the figure above. As is depicted, we have 3 parallel units (shown as circles), each taking $\frac{1}{3}$ of total vnodes. Vnode mapping is [constructed and maintained by meta](https://github.com/risingwavelabs/risingwave/blob/main/src/meta/src/stream/scheduler.rs).

Expand All @@ -34,17 +34,17 @@ Since $v = H(k)$, the way that data are mapped to vnodes will be invariant. Ther

Let's take scaling out for example. Assume that we have one more parallel unit after scaling out, as is depicted as the orange circle in the figure below. Using the optimal strategy, we modify the vnode mapping in such a way that only $\frac{1}{4}$ of the data have to be moved, as is shown in the figure below. The vnodes whose data are required to be moved are highlighted with bold border in the figure.

![optimal data redistribution](./images/consistent-hash/data-redistribution-1.svg)
![optimal data redistribution](../images/consistent-hash/data-redistribution-1.svg)

To minimize data movement when scaling occurs, we should be careful when we modify the vnode mapping. Below is an opposite example. Modifying vnode mapping like this will result in $\frac{1}{2}$ of the data being moved.

![worst data redistribution](./images/consistent-hash/data-redistribution-2.svg)
![worst data redistribution](../images/consistent-hash/data-redistribution-2.svg)

### Streaming

We know that a fragment has several actors as its different parallelisms, and that upstream actors will send data to downstream actors via [dispatcher](./streaming-overview.md#actors). The figure below illustrates how actors distribute data based on consistent hash by example.

![actor data distribution](./images/consistent-hash/actor-data.svg)
![actor data distribution](../images/consistent-hash/actor-data.svg)

In the figure, we can see that one upstream actor dispatches data to three downstream actors. The downstream actors are scheduled on the parallel units mentioned in previous example respectively.

Expand Down Expand Up @@ -78,15 +78,15 @@ We know that [Hummock](./state-store-overview.md#overview), our LSM-Tree-based s
```
table_id | vnode | ...
```
where `table_id` denotes the [state table](relational_table/storing-state-using-relational-table.md#relational-table-layer), and `vnode` is computed via $H$ on key of the data.
where `table_id` denotes the [state table](relational-table.md), and `vnode` is computed via $H$ on key of the data.

To illustrate this, let's revisit the [previous example](#streaming). Executors of an operator will share the same logical state table, just as is shown in the figure below:

![actor state table](./images/consistent-hash/actor-state-table.svg)
![actor state table](../images/consistent-hash/actor-state-table.svg)

Now that we have 12 vnodes in total in the example, the data layout in storage will accordingly look like this:

![storage data layout](./images/consistent-hash/storage-data-layout.svg)
![storage data layout](../images/consistent-hash/storage-data-layout.svg)

Note that we only show the logical sequence and aggregation of data in this illustration. The actual data may be separated into different SSTs in Hummock.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,14 @@ A Data Chunk consists of multiple columns and a visibility array, as is shown in

A Stream Chunk consists of columns, visibility array and an additional `ops` column, as is shown in the right subgraph below. The `ops` column marks the operation of row, which can be one of `Delete`, `Insert`, `UpdateDelete` and `UpdateInsert`.

![chunk](./images/data-model-and-encoding/chunk.svg)
![chunk](../images/data-model-and-encoding/chunk.svg)

## On-Disk Encoding

> Source files: `utils/memcomparable`, `utils/value-encoding`
RisingWave stores user data in shared key-value storage called 'Hummock'. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that `NULL` values are omitted.

![row-format](./images/data-model-and-encoding/row-format.svg)
![row-format](../images/data-model-and-encoding/row-format.svg)

Considering that ordering matters in some cases, e.g. result set of an order-by query, fields of keys must preserve the order of original values after being encoded into bytes. This is what `memcomparable` is used for. For example, integers must be encoded in big-endian and the sign bit must be flipped to preserve order. In contrast, the encoding of values does not need to preserve order.



2 changes: 1 addition & 1 deletion docs/data-source.md → docs/dev/src/design/data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This page describes RisingWave's Data Source API and the architecture behind it.

RisingWave's data source covers four parts: connectors, enumerators, ConnectorSource and SourceExecutor.

![data source arch](../docs/images/data-source/data-source-arch.svg)
![data source arch](../images/data-source/data-source-arch.svg)

### Connectors

Expand Down
2 changes: 1 addition & 1 deletion docs/keys.md → docs/dev/src/design/keys.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ Then when iterating over the keys from storage, the records are returned in the

When the update stream comes, we can just use `id` to identify the records that need to be updated.
We can get the whole record corresponding to the `id` and get the `i` from there.
Then we can use that to update the materialized state accordingly.
Then we can use that to update the materialized state accordingly.
5 changes: 2 additions & 3 deletions docs/meta-service.md → docs/dev/src/design/meta-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RisingWave provides both real-time analytical query as well as high-concurrent a

Meanwhile, components such as metadata provider, scheduler, monitoring are more suitable for a centralized design. For example, a typical on-premise deployment may look like below, where the dotted boxes represent minimal unit of deployment (VM or container).

![Cluster Deployment](./images/meta-service/cluster-deployment.svg)
![Cluster Deployment](../images/meta-service/cluster-deployment.svg)

## Meta Store

Expand Down Expand Up @@ -50,7 +50,6 @@ There are 2 choices on how to distribute information across multiple nodes.

Currently, for simplicity, we choose the push-style approach for all kinds of metadata. This is implemented as `NotificationService` on meta service and `ObserverManager` on frontend and compute nodes.

![Notification](./images/meta-service/notification.svg)
![Notification](../images/meta-service/notification.svg)

`ObserverManager` will register itself to meta service on bootstrap and subscribe metadata it needs. Afterwards, once metadata changed, the meta node streams the changes to it, expecting all subscribers to acknowledge.

Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ After that, you need to [enable OpenDAL](https://github.com/risingwavelabs/risin

You can also use WebHDFS as a lightweight alternative to HDFS. Hdfs is powered by HDFS's native java client. Users need to setup the hdfs services correctly. But webhdfs can access from HTTP API and no extra setup needed. The way to start WebHDFS is basically the same as hdfs, but its default name_node is `127.0.0.1:9870`.

Once these configurations are set, run `./risedev d hdfs` or `./risedev d webhdfs`, then you can run RisingWave on HDFS(WebHDFS).
Once these configurations are set, run `./risedev d hdfs` or `./risedev d webhdfs`, then you can run RisingWave on HDFS(WebHDFS).
Loading

0 comments on commit e961c29

Please sign in to comment.