Skip to content

Commit

Permalink
Merge branch 'main' into dylan/support_batch_s3_parquet_frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jul 11, 2024
2 parents 1cedcb3 + 5bbb5c8 commit c0733e4
Show file tree
Hide file tree
Showing 189 changed files with 3,643 additions and 2,635 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
hooks:
- id: typos
- repo: local
hooks:
- id: rustfmt
name: rustfmt
entry: rustfmt --edition 2021
language: system
types: [rust]
- id: typos
name: typos
entry: typos -w
language: system
- id: cargo sort
name: cargo sort
entry: cargo sort -g -w
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
# After apache/iceberg-rust#411 is merged, we move to the upstream version.
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
arrow-cast = "50"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ RisingWave is distributed under the Apache License (Version 2.0). Please refer t

## Contributing

Thanks for your interest in contributing to the project! Please refer to [contribution guidelines](CONTRIBUTING.md) for more information.
Thanks for your interest in contributing to the project! Please refer to [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)for more information.
24 changes: 2 additions & 22 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,5 @@

This directory contains RisingWave design documents that are intended to be used by contributors to understand our development process, and how we design and implement RisingWave. To learn about how to use RisingWave, check out the [RisingWave user documentation](https://www.risingwave.dev).

## Developer guide

After you learn about the basics of RisingWave, take a look at our [developer guide](https://risingwavelabs.github.io/risingwave/) to get up to speed with the development process.

## Table of Contents

* [Architecture Design](./architecture-design.md)
* [An Overview of RisingWave Streaming Engine](./streaming-overview.md)
* [An Overview of RisingWave State Store](./state-store-overview.md)
* [Meta Service](./meta-service.md)
* [Create MView on Top of MView](./mv-on-mv.md)
* [Checkpoint](./checkpoint.md)
* [Design of Data Source](./data-source.md)
* [Data Model and Encoding](./data-model-and-encoding.md)
* [Design of Batch Local Execution Mode](./batch-local-execution-mode.md)
* [Consistent Hash](./consistent-hash.md)
* [Build RisingWave with Multiple Object Storage Backends](./multi-object-store.md)
* [Backfill](./backfill.md)

## Images

We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
- `/dev` contains the source code for the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)
- `/rustdoc` contains source code for the [crate level documentation](https://risingwavelabs.github.io/risingwave/rustdoc)
4 changes: 4 additions & 0 deletions docs/dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ including the `<!-- toc -->` marker at the place where you want the TOC.

We use `mdbook-linkcheck` to validate URLs included in our documentation.
`linkcheck` will be run automatically when you build with the instructions in the section above.

## Images

We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
23 changes: 22 additions & 1 deletion docs/dev/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
- [Testing](./tests/intro.md)
- [Debugging](./debugging.md)
- [Observability](./observability.md)
- [Metrics](./metrics.md)

---

# Benchmarking and Profiling
# Benchmarking and profiling

- [CPU Profiling](./benchmark-and-profile/cpu-profiling.md)
- [Memory (Heap) Profiling](./benchmark-and-profile/memory-profiling.md)
Expand All @@ -25,8 +26,28 @@
# Specialized topics

- [Develop Connectors](./connector/intro.md)
- [Source](./connector/source.md)
- [Continuous Integration](./ci.md)

---

# Design docs

- [Architecture Design](./design/architecture-design.md)
- [Streaming Engine](./design/streaming-overview.md)
- [Checkpoint](./design/checkpoint.md)
- [Aggregation](./design/aggregation.md)
- [MView on Top of MView](./design/mv-on-mv.md)
- [Backfill](./design/backfill.md)
- [State Store](./design/state-store-overview.md)
- [Shared Buffer](./design/shared-buffer.md)
- [Relational Table](./design/relational-table.md)
- [Multiple Object Storage Backends](./design/multi-object-store.md)
- [Meta Service](./design/meta-service.md)
- [Data Model and Encoding](./design/data-model-and-encoding.md)
- [Batch Local Execution Mode](./design/batch-local-execution-mode.md)
- [Consistent Hash](./design/consistent-hash.md)
- [Keys](./design/keys.md)
<!--
TODO:
Expand Down
3 changes: 3 additions & 0 deletions docs/dev/src/connector/intro.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Develop Connectors

This page describes the development workflow to develop connectors. For design docs, see
- [Source](./source.md)

RisingWave supports a lot of connectors (sources and sinks).
However, developing connectors is tricky because it involves external systems:

Expand Down
18 changes: 7 additions & 11 deletions docs/data-source.md → docs/dev/src/connector/source.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# Data Source

- [Data Source](#data-source)
- [Components](#components)
- [Connectors](#connectors)
- [Enumerators](#enumerators)
- [ConnectorSource](#connectorsource)
- [SourceExecutor](#sourceexecutor)
- [How It Works](#how-it-works)
# Source

This page describes RisingWave's Data Source API and the architecture behind it. This may help if you are interested in how data sources work, or if you want to implement a new Data Source.

For the workflow of developing connectors, see [Develop Connectors](../connector/intro.md).

<!-- toc -->

## Components

RisingWave's data source covers four parts: connectors, enumerators, ConnectorSource and SourceExecutor.

![data source arch](../docs/images/data-source/data-source-arch.svg)
![data source arch](../images/data-source/data-source-arch.svg)

### Connectors

Expand All @@ -41,7 +37,7 @@ pub trait SplitReader: Sized {

### Enumerators

`Enumerator` periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the [meta](./meta-service.md). If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.
`Enumerator` periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the [meta](../design/meta-service.md). If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.

All enumerators need to implement the following trait.

Expand Down
6 changes: 3 additions & 3 deletions docs/aggregation.md → docs/dev/src/design/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TODO

## HashAggExecutor

![aggregation components](./images/aggregation/agg-components.png)
![aggregation components](../images/aggregation/agg-components.png)

Within the `HashAggExecutor`, there are 4 main components:
1. AggCalls.
Expand All @@ -40,10 +40,10 @@ For each of these aggregations, they have 1 state table (`AggStateStorage::Mater

### Initialization of `AggGroups`

![init-agg-group](./images/aggregation/init-agg-group.png)
![init-agg-group](../images/aggregation/init-agg-group.png)

AggGroups are initialized when corresponding aggregation groups are not found in `AggGroupCache`.
This could be either because the `AggGroupCache` got evicted,
or its a new group key.

It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ There are currently 4 types of nodes in the cluster:
* **HummockManager**: Manages the SST file manifest and meta-info of Hummock storage.
* **CompactionManager**: Manages the compaction status and task assignment of Hummock storage.

![Architecture](./images/architecture-design/architecture.svg)
![Architecture](../images/architecture-design/architecture.svg)

The topmost component is the Postgres client. It issues queries through [TCP-based Postgres wire protocol](https://www.postgresql.org/docs/current/protocol.html).

Expand All @@ -41,13 +41,13 @@ Let's begin with a simple SELECT and see how it is executed.
SELECT SUM(t.quantity) FROM t group by t.company;
```

![Batch-Query](./images/architecture-design/batch-query.svg)
![Batch-Query](../images/architecture-design/batch-query.svg)

The query will be sliced into multiple *plan fragments*, each being an independent scheduling unit and probably with different parallelism. For simplicity, parallelism is usually set to the number of CPU cores in the cluster. For example, if there are 3 compute-nodes in the cluster, each with 4 CPU cores, then the parallelism will be set to 12 by default.

Each parallel unit is called a *task*. Specifically, PlanFragment 2 will be distributed as 4 tasks to 4 CPU cores.

![Plan-Fragments](./images/architecture-design/plan-fragments.svg)
![Plan-Fragments](../images/architecture-design/plan-fragments.svg)

Behind the TableScan operator, there's a storage engine called Hummock that stores the internal states, materialized views, and the tables. Note that only the materialized views and tables are queryable. The internal states are invisible to users.

Expand All @@ -62,7 +62,7 @@ For example:
CREATE MATERIALIZED VIEW mv1 AS SELECT SUM(t.quantity) as q FROM t group by t.company;
```

![Stream-Pipeline](./images/architecture-design/stream-pipeline.png)
![Stream-Pipeline](../images/architecture-design/stream-pipeline.png)

When the data source (Kafka, e.g.) propagates a bunch of records into the system, the materialized view will refresh automatically.

Expand Down
16 changes: 8 additions & 8 deletions docs/backfill.md → docs/dev/src/design/backfill.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ Notice how `mv2` only needs the `id` column from `mv1`, and not the full `pk` wi

#### Overview

![backfill sides](./images/backfill/backfill-sides.png)
![backfill sides](../images/backfill/backfill-sides.png)

For `ArrangementBackfill`, we have 2 streams which we merge:
upstream and historical streams.
Expand All @@ -230,7 +230,7 @@ to make sure `Barriers` can flow through the stream graph.
Additionally, every epoch, we will refresh the historical stream, as upstream data gets checkpointed
so our snapshot is stale.

![polling](./images/backfill/polling.png)
![polling](../images/backfill/polling.png)

We will poll from this stream in backfill to get upstream and historical data chunks for processing,
as well as barriers to checkpoint to backfill state.
Expand All @@ -239,7 +239,7 @@ For each chunk (DataChunk / StreamChunk), we may also need to do some further pr

#### Schemas

![schema](./images/backfill/schema.png)
![schema](../images/backfill/schema.png)

There are 3 schemas to consider when processing the backfill data:
1. The state table schema of upstream.
Expand All @@ -258,7 +258,7 @@ to ensure the historical side and the upstream side have a consistent schema.

#### Polling loop

![handle_poll](./images/backfill/handle-poll.png)
![handle_poll](../images/backfill/handle-poll.png)

If we poll a chunk from the historical side, we will yield it to the downstream,
and update the primary key (pk) we have backfilled to in the backfill state.
Expand All @@ -278,7 +278,7 @@ Then the polling loop will continue.

### Replication

![replication_simple](./images/backfill/replication-simple.png)
![replication_simple](../images/backfill/replication-simple.png)

Previously, when doing snapshot reads to read **Historical Data**, backfill executor is able to read
from the shared buffer for the previous epoch.
Expand All @@ -288,7 +288,7 @@ However, with `ArrangementBackfill`,
we can't rely on the shared buffer of upstream,
since it can be on a different parallel unit.

![replication_replicated](./images/backfill/replication-replicated.png)
![replication_replicated](../images/backfill/replication-replicated.png)

So we need to make sure for the previous epoch, we buffer
its updates somewhere to replicate the shared buffer.
Expand All @@ -314,7 +314,7 @@ to ensure the historical side and the upstream side have a consistent schema.
Where (1) refers to the state table schema of upstream,
and (2) refers to the output schema from upstream to arrangement backfill.

![replication_example](./images/backfill/replication-example.png)
![replication_example](../images/backfill/replication-example.png)

Now let's consider an instance where (1) has the schema:

Expand Down Expand Up @@ -393,4 +393,4 @@ TODO

## Source Backfill

TODO
TODO
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ requirement of point queries, and in this article we introduce local execution m

## Design

![Frontend Flow](./images/batch-local-execution-mode/frontend-flow.svg)
![Frontend Flow](../images/batch-local-execution-mode/frontend-flow.svg)

## Example 1: select a from t where b in (1, 2, 3, 4)

Let's use the above SQL as an example:


![Example 1](./images/batch-local-execution-mode/example1.svg)
![Example 1](../images/batch-local-execution-mode/example1.svg)

The key changes from the distributed mode:

Expand All @@ -36,7 +36,7 @@ The key changes from the distributed mode:
Following is the plan and execution of above sql in local mode:


![Example 2](./images/batch-local-execution-mode/example2.svg)
![Example 2](../images/batch-local-execution-mode/example2.svg)

As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.

Expand Down
4 changes: 2 additions & 2 deletions docs/checkpoint.md → docs/dev/src/design/checkpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The consistent checkpoints play 2 roles in our system.

RisingWave makes checkpointing via [Chandy–Lamport algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm). A special kind of message, checkpoint barriers, is generated on streaming source and propagates across the streaming graph to the materialized views (or sink).

![](./images/checkpoint/checkpoint.svg)
![](../images/checkpoint/checkpoint.svg)

To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every `barrier_interval_ms`) repeats the following procedure:
Expand All @@ -39,6 +39,6 @@ As is mentioned before, during checkpointing, every operator writes their change

A local shared buffer is introduced to stage these uncommitted write batches. Once the checkpoint barriers have pass through all actors, the storage manager can notify all compute nodes to 'commit' their buffered write batches into the shared storage.

![shared buffer](./images/checkpoint/shared-buffer.svg)
![shared buffer](../images/checkpoint/shared-buffer.svg)

Another benefit of shared buffer is that the write batches in a compute node can be compacted into a single SSTable file before uploading, which significantly reduces the number of SSTable files in Layer 0.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Here comes the main part, where we will construct a mapping that determines data

For all data $k \in U_k$, where $U_k$ is an unbounded set, we apply a hash function $v = H(k)$, where $v$ falls to a limited range. The hash function $H$ ensures that all data are hashed **uniformly** to that range. We call $v$ vnode, namely virtual node, as is shown as the squares in the figure below.

![initial data distribution](./images/consistent-hash/data-distribution.svg)
![initial data distribution](../images/consistent-hash/data-distribution.svg)

Then we have vnode mapping, which ensures that vnodes are mapped evenly to parallel units in the cluster. In other words, the number of vnodes that are mapped to each parallel unit should be as close as possible. This is denoted by different colors in the figure above. As is depicted, we have 3 parallel units (shown as circles), each taking $\frac{1}{3}$ of total vnodes. Vnode mapping is [constructed and maintained by meta](https://github.com/risingwavelabs/risingwave/blob/main/src/meta/src/stream/scheduler.rs).

Expand All @@ -34,17 +34,17 @@ Since $v = H(k)$, the way that data are mapped to vnodes will be invariant. Ther

Let's take scaling out for example. Assume that we have one more parallel unit after scaling out, as is depicted as the orange circle in the figure below. Using the optimal strategy, we modify the vnode mapping in such a way that only $\frac{1}{4}$ of the data have to be moved, as is shown in the figure below. The vnodes whose data are required to be moved are highlighted with bold border in the figure.

![optimal data redistribution](./images/consistent-hash/data-redistribution-1.svg)
![optimal data redistribution](../images/consistent-hash/data-redistribution-1.svg)

To minimize data movement when scaling occurs, we should be careful when we modify the vnode mapping. Below is an opposite example. Modifying vnode mapping like this will result in $\frac{1}{2}$ of the data being moved.

![worst data redistribution](./images/consistent-hash/data-redistribution-2.svg)
![worst data redistribution](../images/consistent-hash/data-redistribution-2.svg)

### Streaming

We know that a fragment has several actors as its different parallelisms, and that upstream actors will send data to downstream actors via [dispatcher](./streaming-overview.md#actors). The figure below illustrates how actors distribute data based on consistent hash by example.

![actor data distribution](./images/consistent-hash/actor-data.svg)
![actor data distribution](../images/consistent-hash/actor-data.svg)

In the figure, we can see that one upstream actor dispatches data to three downstream actors. The downstream actors are scheduled on the parallel units mentioned in previous example respectively.

Expand Down Expand Up @@ -78,15 +78,15 @@ We know that [Hummock](./state-store-overview.md#overview), our LSM-Tree-based s
```
table_id | vnode | ...
```
where `table_id` denotes the [state table](relational_table/storing-state-using-relational-table.md#relational-table-layer), and `vnode` is computed via $H$ on key of the data.
where `table_id` denotes the [state table](relational-table.md), and `vnode` is computed via $H$ on key of the data.

To illustrate this, let's revisit the [previous example](#streaming). Executors of an operator will share the same logical state table, just as is shown in the figure below:

![actor state table](./images/consistent-hash/actor-state-table.svg)
![actor state table](../images/consistent-hash/actor-state-table.svg)

Now that we have 12 vnodes in total in the example, the data layout in storage will accordingly look like this:

![storage data layout](./images/consistent-hash/storage-data-layout.svg)
![storage data layout](../images/consistent-hash/storage-data-layout.svg)

Note that we only show the logical sequence and aggregation of data in this illustration. The actual data may be separated into different SSTs in Hummock.

Expand Down
Loading

0 comments on commit c0733e4

Please sign in to comment.