diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index ab8ba3d9d7eb9..cb54c1606356e 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -7,6 +7,10 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
+- repo: https://github.com/crate-ci/typos
+ rev: v1.23.1
+ hooks:
+ - id: typos
- repo: local
hooks:
- id: rustfmt
@@ -14,10 +18,6 @@ repos:
entry: rustfmt --edition 2021
language: system
types: [rust]
- - id: typos
- name: typos
- entry: typos -w
- language: system
- id: cargo sort
name: cargo sort
entry: cargo sort -g -w
diff --git a/Cargo.toml b/Cargo.toml
index 57fda38e8629d..45bf33f25f889 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -143,6 +143,7 @@ arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
# After apache/iceberg-rust#411 is merged, we move to the upstream version.
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
+opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
arrow-cast = "50"
diff --git a/README.md b/README.md
index 156b53060313d..e08bb45d47c92 100644
--- a/README.md
+++ b/README.md
@@ -150,4 +150,4 @@ RisingWave is distributed under the Apache License (Version 2.0). Please refer t
## Contributing
-Thanks for your interest in contributing to the project! Please refer to [contribution guidelines](CONTRIBUTING.md) for more information.
+Thanks for your interest in contributing to the project! Please refer to [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)for more information.
diff --git a/docs/README.md b/docs/README.md
index e905cea7849ea..f371d9bda8f47 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -2,25 +2,5 @@
This directory contains RisingWave design documents that are intended to be used by contributors to understand our development process, and how we design and implement RisingWave. To learn about how to use RisingWave, check out the [RisingWave user documentation](https://www.risingwave.dev).
-## Developer guide
-
-After you learn about the basics of RisingWave, take a look at our [developer guide](https://risingwavelabs.github.io/risingwave/) to get up to speed with the development process.
-
-## Table of Contents
-
-* [Architecture Design](./architecture-design.md)
-* [An Overview of RisingWave Streaming Engine](./streaming-overview.md)
-* [An Overview of RisingWave State Store](./state-store-overview.md)
-* [Meta Service](./meta-service.md)
-* [Create MView on Top of MView](./mv-on-mv.md)
-* [Checkpoint](./checkpoint.md)
-* [Design of Data Source](./data-source.md)
-* [Data Model and Encoding](./data-model-and-encoding.md)
-* [Design of Batch Local Execution Mode](./batch-local-execution-mode.md)
-* [Consistent Hash](./consistent-hash.md)
-* [Build RisingWave with Multiple Object Storage Backends](./multi-object-store.md)
-* [Backfill](./backfill.md)
-
-## Images
-
-We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
+- `/dev` contains the source code for the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)
+- `/rustdoc` contains source code for the [crate level documentation](https://risingwavelabs.github.io/risingwave/rustdoc)
diff --git a/docs/dev/README.md b/docs/dev/README.md
index e19f10c08e3c3..7e47920d49400 100644
--- a/docs/dev/README.md
+++ b/docs/dev/README.md
@@ -28,3 +28,7 @@ including the `` marker at the place where you want the TOC.
We use `mdbook-linkcheck` to validate URLs included in our documentation.
`linkcheck` will be run automatically when you build with the instructions in the section above.
+
+## Images
+
+We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing.
diff --git a/docs/dev/src/SUMMARY.md b/docs/dev/src/SUMMARY.md
index 76cf57c007c23..37a1484f0f36b 100644
--- a/docs/dev/src/SUMMARY.md
+++ b/docs/dev/src/SUMMARY.md
@@ -11,10 +11,11 @@
- [Testing](./tests/intro.md)
- [Debugging](./debugging.md)
- [Observability](./observability.md)
+ - [Metrics](./metrics.md)
---
-# Benchmarking and Profiling
+# Benchmarking and profiling
- [CPU Profiling](./benchmark-and-profile/cpu-profiling.md)
- [Memory (Heap) Profiling](./benchmark-and-profile/memory-profiling.md)
@@ -25,8 +26,28 @@
# Specialized topics
- [Develop Connectors](./connector/intro.md)
+ - [Source](./connector/source.md)
- [Continuous Integration](./ci.md)
+---
+
+# Design docs
+
+- [Architecture Design](./design/architecture-design.md)
+- [Streaming Engine](./design/streaming-overview.md)
+ - [Checkpoint](./design/checkpoint.md)
+ - [Aggregation](./design/aggregation.md)
+ - [MView on Top of MView](./design/mv-on-mv.md)
+ - [Backfill](./design/backfill.md)
+- [State Store](./design/state-store-overview.md)
+ - [Shared Buffer](./design/shared-buffer.md)
+ - [Relational Table](./design/relational-table.md)
+ - [Multiple Object Storage Backends](./design/multi-object-store.md)
+- [Meta Service](./design/meta-service.md)
+- [Data Model and Encoding](./design/data-model-and-encoding.md)
+- [Batch Local Execution Mode](./design/batch-local-execution-mode.md)
+- [Consistent Hash](./design/consistent-hash.md)
+- [Keys](./design/keys.md)
+
## Components
RisingWave's data source covers four parts: connectors, enumerators, ConnectorSource and SourceExecutor.
-![data source arch](../docs/images/data-source/data-source-arch.svg)
+![data source arch](../images/data-source/data-source-arch.svg)
### Connectors
@@ -41,7 +37,7 @@ pub trait SplitReader: Sized {
### Enumerators
-`Enumerator` periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the [meta](./meta-service.md). If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.
+`Enumerator` periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the [meta](../design/meta-service.md). If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.
All enumerators need to implement the following trait.
diff --git a/docs/aggregation.md b/docs/dev/src/design/aggregation.md
similarity index 91%
rename from docs/aggregation.md
rename to docs/dev/src/design/aggregation.md
index 05d50c17967e2..bcb446a8ab73b 100644
--- a/docs/aggregation.md
+++ b/docs/dev/src/design/aggregation.md
@@ -13,7 +13,7 @@ TODO
## HashAggExecutor
-![aggregation components](./images/aggregation/agg-components.png)
+![aggregation components](../images/aggregation/agg-components.png)
Within the `HashAggExecutor`, there are 4 main components:
1. AggCalls.
@@ -40,10 +40,10 @@ For each of these aggregations, they have 1 state table (`AggStateStorage::Mater
### Initialization of `AggGroups`
-![init-agg-group](./images/aggregation/init-agg-group.png)
+![init-agg-group](../images/aggregation/init-agg-group.png)
AggGroups are initialized when corresponding aggregation groups are not found in `AggGroupCache`.
This could be either because the `AggGroupCache` got evicted,
or its a new group key.
-It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
\ No newline at end of file
+It could take a while to initialize agg groups, hence we cache them in `AggGroupCache`.
diff --git a/docs/architecture-design.md b/docs/dev/src/design/architecture-design.md
similarity index 95%
rename from docs/architecture-design.md
rename to docs/dev/src/design/architecture-design.md
index e89a945395050..1ec7e4de59635 100644
--- a/docs/architecture-design.md
+++ b/docs/dev/src/design/architecture-design.md
@@ -19,7 +19,7 @@ There are currently 4 types of nodes in the cluster:
* **HummockManager**: Manages the SST file manifest and meta-info of Hummock storage.
* **CompactionManager**: Manages the compaction status and task assignment of Hummock storage.
-![Architecture](./images/architecture-design/architecture.svg)
+![Architecture](../images/architecture-design/architecture.svg)
The topmost component is the Postgres client. It issues queries through [TCP-based Postgres wire protocol](https://www.postgresql.org/docs/current/protocol.html).
@@ -41,13 +41,13 @@ Let's begin with a simple SELECT and see how it is executed.
SELECT SUM(t.quantity) FROM t group by t.company;
```
-![Batch-Query](./images/architecture-design/batch-query.svg)
+![Batch-Query](../images/architecture-design/batch-query.svg)
The query will be sliced into multiple *plan fragments*, each being an independent scheduling unit and probably with different parallelism. For simplicity, parallelism is usually set to the number of CPU cores in the cluster. For example, if there are 3 compute-nodes in the cluster, each with 4 CPU cores, then the parallelism will be set to 12 by default.
Each parallel unit is called a *task*. Specifically, PlanFragment 2 will be distributed as 4 tasks to 4 CPU cores.
-![Plan-Fragments](./images/architecture-design/plan-fragments.svg)
+![Plan-Fragments](../images/architecture-design/plan-fragments.svg)
Behind the TableScan operator, there's a storage engine called Hummock that stores the internal states, materialized views, and the tables. Note that only the materialized views and tables are queryable. The internal states are invisible to users.
@@ -62,7 +62,7 @@ For example:
CREATE MATERIALIZED VIEW mv1 AS SELECT SUM(t.quantity) as q FROM t group by t.company;
```
-![Stream-Pipeline](./images/architecture-design/stream-pipeline.png)
+![Stream-Pipeline](../images/architecture-design/stream-pipeline.png)
When the data source (Kafka, e.g.) propagates a bunch of records into the system, the materialized view will refresh automatically.
diff --git a/docs/backfill.md b/docs/dev/src/design/backfill.md
similarity index 97%
rename from docs/backfill.md
rename to docs/dev/src/design/backfill.md
index aac20615caf10..d23f7332b8d03 100644
--- a/docs/backfill.md
+++ b/docs/dev/src/design/backfill.md
@@ -221,7 +221,7 @@ Notice how `mv2` only needs the `id` column from `mv1`, and not the full `pk` wi
#### Overview
-![backfill sides](./images/backfill/backfill-sides.png)
+![backfill sides](../images/backfill/backfill-sides.png)
For `ArrangementBackfill`, we have 2 streams which we merge:
upstream and historical streams.
@@ -230,7 +230,7 @@ to make sure `Barriers` can flow through the stream graph.
Additionally, every epoch, we will refresh the historical stream, as upstream data gets checkpointed
so our snapshot is stale.
-![polling](./images/backfill/polling.png)
+![polling](../images/backfill/polling.png)
We will poll from this stream in backfill to get upstream and historical data chunks for processing,
as well as barriers to checkpoint to backfill state.
@@ -239,7 +239,7 @@ For each chunk (DataChunk / StreamChunk), we may also need to do some further pr
#### Schemas
-![schema](./images/backfill/schema.png)
+![schema](../images/backfill/schema.png)
There are 3 schemas to consider when processing the backfill data:
1. The state table schema of upstream.
@@ -258,7 +258,7 @@ to ensure the historical side and the upstream side have a consistent schema.
#### Polling loop
-![handle_poll](./images/backfill/handle-poll.png)
+![handle_poll](../images/backfill/handle-poll.png)
If we poll a chunk from the historical side, we will yield it to the downstream,
and update the primary key (pk) we have backfilled to in the backfill state.
@@ -278,7 +278,7 @@ Then the polling loop will continue.
### Replication
-![replication_simple](./images/backfill/replication-simple.png)
+![replication_simple](../images/backfill/replication-simple.png)
Previously, when doing snapshot reads to read **Historical Data**, backfill executor is able to read
from the shared buffer for the previous epoch.
@@ -288,7 +288,7 @@ However, with `ArrangementBackfill`,
we can't rely on the shared buffer of upstream,
since it can be on a different parallel unit.
-![replication_replicated](./images/backfill/replication-replicated.png)
+![replication_replicated](../images/backfill/replication-replicated.png)
So we need to make sure for the previous epoch, we buffer
its updates somewhere to replicate the shared buffer.
@@ -314,7 +314,7 @@ to ensure the historical side and the upstream side have a consistent schema.
Where (1) refers to the state table schema of upstream,
and (2) refers to the output schema from upstream to arrangement backfill.
-![replication_example](./images/backfill/replication-example.png)
+![replication_example](../images/backfill/replication-example.png)
Now let's consider an instance where (1) has the schema:
@@ -393,4 +393,4 @@ TODO
## Source Backfill
-TODO
\ No newline at end of file
+TODO
diff --git a/docs/batch-local-execution-mode.md b/docs/dev/src/design/batch-local-execution-mode.md
similarity index 94%
rename from docs/batch-local-execution-mode.md
rename to docs/dev/src/design/batch-local-execution-mode.md
index 219488acbcdb9..f60e59054acc8 100644
--- a/docs/batch-local-execution-mode.md
+++ b/docs/dev/src/design/batch-local-execution-mode.md
@@ -17,14 +17,14 @@ requirement of point queries, and in this article we introduce local execution m
## Design
-![Frontend Flow](./images/batch-local-execution-mode/frontend-flow.svg)
+![Frontend Flow](../images/batch-local-execution-mode/frontend-flow.svg)
## Example 1: select a from t where b in (1, 2, 3, 4)
Let's use the above SQL as an example:
-![Example 1](./images/batch-local-execution-mode/example1.svg)
+![Example 1](../images/batch-local-execution-mode/example1.svg)
The key changes from the distributed mode:
@@ -36,7 +36,7 @@ The key changes from the distributed mode:
Following is the plan and execution of above sql in local mode:
-![Example 2](./images/batch-local-execution-mode/example2.svg)
+![Example 2](../images/batch-local-execution-mode/example2.svg)
As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.
diff --git a/docs/checkpoint.md b/docs/dev/src/design/checkpoint.md
similarity index 97%
rename from docs/checkpoint.md
rename to docs/dev/src/design/checkpoint.md
index 4cba70c406b28..32125ce0af658 100644
--- a/docs/checkpoint.md
+++ b/docs/dev/src/design/checkpoint.md
@@ -20,7 +20,7 @@ The consistent checkpoints play 2 roles in our system.
RisingWave makes checkpointing via [Chandy–Lamport algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm). A special kind of message, checkpoint barriers, is generated on streaming source and propagates across the streaming graph to the materialized views (or sink).
-![](./images/checkpoint/checkpoint.svg)
+![](../images/checkpoint/checkpoint.svg)
To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every `barrier_interval_ms`) repeats the following procedure:
@@ -39,6 +39,6 @@ As is mentioned before, during checkpointing, every operator writes their change
A local shared buffer is introduced to stage these uncommitted write batches. Once the checkpoint barriers have pass through all actors, the storage manager can notify all compute nodes to 'commit' their buffered write batches into the shared storage.
-![shared buffer](./images/checkpoint/shared-buffer.svg)
+![shared buffer](../images/checkpoint/shared-buffer.svg)
Another benefit of shared buffer is that the write batches in a compute node can be compacted into a single SSTable file before uploading, which significantly reduces the number of SSTable files in Layer 0.
diff --git a/docs/consistent-hash.md b/docs/dev/src/design/consistent-hash.md
similarity index 92%
rename from docs/consistent-hash.md
rename to docs/dev/src/design/consistent-hash.md
index b3bdf1cf57559..43c3b973c04bc 100644
--- a/docs/consistent-hash.md
+++ b/docs/dev/src/design/consistent-hash.md
@@ -22,7 +22,7 @@ Here comes the main part, where we will construct a mapping that determines data
For all data $k \in U_k$, where $U_k$ is an unbounded set, we apply a hash function $v = H(k)$, where $v$ falls to a limited range. The hash function $H$ ensures that all data are hashed **uniformly** to that range. We call $v$ vnode, namely virtual node, as is shown as the squares in the figure below.
-![initial data distribution](./images/consistent-hash/data-distribution.svg)
+![initial data distribution](../images/consistent-hash/data-distribution.svg)
Then we have vnode mapping, which ensures that vnodes are mapped evenly to parallel units in the cluster. In other words, the number of vnodes that are mapped to each parallel unit should be as close as possible. This is denoted by different colors in the figure above. As is depicted, we have 3 parallel units (shown as circles), each taking $\frac{1}{3}$ of total vnodes. Vnode mapping is [constructed and maintained by meta](https://github.com/risingwavelabs/risingwave/blob/main/src/meta/src/stream/scheduler.rs).
@@ -34,17 +34,17 @@ Since $v = H(k)$, the way that data are mapped to vnodes will be invariant. Ther
Let's take scaling out for example. Assume that we have one more parallel unit after scaling out, as is depicted as the orange circle in the figure below. Using the optimal strategy, we modify the vnode mapping in such a way that only $\frac{1}{4}$ of the data have to be moved, as is shown in the figure below. The vnodes whose data are required to be moved are highlighted with bold border in the figure.
-![optimal data redistribution](./images/consistent-hash/data-redistribution-1.svg)
+![optimal data redistribution](../images/consistent-hash/data-redistribution-1.svg)
To minimize data movement when scaling occurs, we should be careful when we modify the vnode mapping. Below is an opposite example. Modifying vnode mapping like this will result in $\frac{1}{2}$ of the data being moved.
-![worst data redistribution](./images/consistent-hash/data-redistribution-2.svg)
+![worst data redistribution](../images/consistent-hash/data-redistribution-2.svg)
### Streaming
We know that a fragment has several actors as its different parallelisms, and that upstream actors will send data to downstream actors via [dispatcher](./streaming-overview.md#actors). The figure below illustrates how actors distribute data based on consistent hash by example.
-![actor data distribution](./images/consistent-hash/actor-data.svg)
+![actor data distribution](../images/consistent-hash/actor-data.svg)
In the figure, we can see that one upstream actor dispatches data to three downstream actors. The downstream actors are scheduled on the parallel units mentioned in previous example respectively.
@@ -78,15 +78,15 @@ We know that [Hummock](./state-store-overview.md#overview), our LSM-Tree-based s
```
table_id | vnode | ...
```
-where `table_id` denotes the [state table](relational_table/storing-state-using-relational-table.md#relational-table-layer), and `vnode` is computed via $H$ on key of the data.
+where `table_id` denotes the [state table](relational-table.md), and `vnode` is computed via $H$ on key of the data.
To illustrate this, let's revisit the [previous example](#streaming). Executors of an operator will share the same logical state table, just as is shown in the figure below:
-![actor state table](./images/consistent-hash/actor-state-table.svg)
+![actor state table](../images/consistent-hash/actor-state-table.svg)
Now that we have 12 vnodes in total in the example, the data layout in storage will accordingly look like this:
-![storage data layout](./images/consistent-hash/storage-data-layout.svg)
+![storage data layout](../images/consistent-hash/storage-data-layout.svg)
Note that we only show the logical sequence and aggregation of data in this illustration. The actual data may be separated into different SSTs in Hummock.
diff --git a/docs/data-model-and-encoding.md b/docs/dev/src/design/data-model-and-encoding.md
similarity index 86%
rename from docs/data-model-and-encoding.md
rename to docs/dev/src/design/data-model-and-encoding.md
index f27745a41be4d..c2a70c40909c6 100644
--- a/docs/data-model-and-encoding.md
+++ b/docs/dev/src/design/data-model-and-encoding.md
@@ -1,11 +1,6 @@
# Data Model and Encoding
-- [Data Model and Encoding](#data-model-and-encoding)
- - [Data Model](#data-model)
- - [In-Memory Encoding](#in-memory-encoding)
- - [On-Disk Encoding](#on-disk-encoding)
-
-
+
## Data Model
@@ -26,7 +21,7 @@ Primitive data types:
- Strings: `VARCHAR`
- Temporals: `DATE`, `TIMESTAMP`, `TIMESTAMP WITH TIME ZONE`, `TIME`, `INTERVAL`
-Composite data types (WIP):
+Composite data types:
- `Struct`: A structure with a list of named, strong-typed fields.
- `List`: A variable-length list of values with same data type.
@@ -41,7 +36,7 @@ A Data Chunk consists of multiple columns and a visibility array, as is shown in
A Stream Chunk consists of columns, visibility array and an additional `ops` column, as is shown in the right subgraph below. The `ops` column marks the operation of row, which can be one of `Delete`, `Insert`, `UpdateDelete` and `UpdateInsert`.
-![chunk](./images/data-model-and-encoding/chunk.svg)
+![chunk](../images/data-model-and-encoding/chunk.svg)
## On-Disk Encoding
@@ -49,9 +44,6 @@ A Stream Chunk consists of columns, visibility array and an additional `ops` col
RisingWave stores user data in shared key-value storage called 'Hummock'. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that `NULL` values are omitted.
-![row-format](./images/data-model-and-encoding/row-format.svg)
+![row-format](../images/data-model-and-encoding/row-format.svg)
Considering that ordering matters in some cases, e.g. result set of an order-by query, fields of keys must preserve the order of original values after being encoded into bytes. This is what `memcomparable` is used for. For example, integers must be encoded in big-endian and the sign bit must be flipped to preserve order. In contrast, the encoding of values does not need to preserve order.
-
-
-
diff --git a/docs/keys.md b/docs/dev/src/design/keys.md
similarity index 99%
rename from docs/keys.md
rename to docs/dev/src/design/keys.md
index b0baa40c8716d..1e1bc056978d3 100644
--- a/docs/keys.md
+++ b/docs/dev/src/design/keys.md
@@ -62,4 +62,4 @@ Then when iterating over the keys from storage, the records are returned in the
When the update stream comes, we can just use `id` to identify the records that need to be updated.
We can get the whole record corresponding to the `id` and get the `i` from there.
-Then we can use that to update the materialized state accordingly.
\ No newline at end of file
+Then we can use that to update the materialized state accordingly.
diff --git a/docs/meta-service.md b/docs/dev/src/design/meta-service.md
similarity index 86%
rename from docs/meta-service.md
rename to docs/dev/src/design/meta-service.md
index cf10de6ff3bb4..4ec7b01d10e4f 100644
--- a/docs/meta-service.md
+++ b/docs/dev/src/design/meta-service.md
@@ -1,14 +1,6 @@
# Meta Service
-- [Meta Service](#meta-service)
- - [Background](#background)
- - [Meta Store](#meta-store)
- - [Types of Metadata](#types-of-metadata)
- - [Catalog](#catalog)
- - [Storage](#storage)
- - [Push on Updates](#push-on-updates)
-
-
+
## Background
@@ -16,7 +8,7 @@ RisingWave provides both real-time analytical query as well as high-concurrent a
Meanwhile, components such as metadata provider, scheduler, monitoring are more suitable for a centralized design. For example, a typical on-premise deployment may look like below, where the dotted boxes represent minimal unit of deployment (VM or container).
-![Cluster Deployment](./images/meta-service/cluster-deployment.svg)
+![Cluster Deployment](../images/meta-service/cluster-deployment.svg)
## Meta Store
@@ -50,7 +42,6 @@ There are 2 choices on how to distribute information across multiple nodes.
Currently, for simplicity, we choose the push-style approach for all kinds of metadata. This is implemented as `NotificationService` on meta service and `ObserverManager` on frontend and compute nodes.
-![Notification](./images/meta-service/notification.svg)
+![Notification](../images/meta-service/notification.svg)
`ObserverManager` will register itself to meta service on bootstrap and subscribe metadata it needs. Afterwards, once metadata changed, the meta node streams the changes to it, expecting all subscribers to acknowledge.
-
diff --git a/docs/multi-object-store.md b/docs/dev/src/design/multi-object-store.md
similarity index 98%
rename from docs/multi-object-store.md
rename to docs/dev/src/design/multi-object-store.md
index fe12ce579adf3..699cd036f419d 100644
--- a/docs/multi-object-store.md
+++ b/docs/dev/src/design/multi-object-store.md
@@ -1,7 +1,6 @@
# Build RisingWave with Multiple Object Storage Backends
-
-
+
## Overview
As a cloud-neutral database, RisingWave supports running on different (object) storage backends. Currently, these storage products include
@@ -76,4 +75,4 @@ After that, you need to [enable OpenDAL](https://github.com/risingwavelabs/risin
You can also use WebHDFS as a lightweight alternative to HDFS. Hdfs is powered by HDFS's native java client. Users need to setup the hdfs services correctly. But webhdfs can access from HTTP API and no extra setup needed. The way to start WebHDFS is basically the same as hdfs, but its default name_node is `127.0.0.1:9870`.
-Once these configurations are set, run `./risedev d hdfs` or `./risedev d webhdfs`, then you can run RisingWave on HDFS(WebHDFS).
\ No newline at end of file
+Once these configurations are set, run `./risedev d hdfs` or `./risedev d webhdfs`, then you can run RisingWave on HDFS(WebHDFS).
diff --git a/docs/mv-on-mv.md b/docs/dev/src/design/mv-on-mv.md
similarity index 95%
rename from docs/mv-on-mv.md
rename to docs/dev/src/design/mv-on-mv.md
index e66b48224f43f..d1091c0c0ac82 100644
--- a/docs/mv-on-mv.md
+++ b/docs/dev/src/design/mv-on-mv.md
@@ -1,4 +1,4 @@
-# Create MView on Top of MView
+# MView on Top of MView
## Background
@@ -19,7 +19,7 @@ create materialized view mv3 as select count(v1) as count_v1 from mv1;
In physical representation, we introduce a dispatcher operator type, *Broadcast*. Broadcast dispatcher, as its name indicates, will dispatch every message to multiple downstreams. To simplify our design, we can assume that every MViewOperator has a `Broadcast` output, with zero or more downstreams.
-![fig1](../docs/images/mv-on-mv/mv-on-mv-01.svg)
+![fig1](../images/mv-on-mv/mv-on-mv-01.svg)
### Create new mview online
@@ -27,7 +27,7 @@ Assume that we already have a materialized view mv1, and we want to create a new
The Chain operator has two inputs. The first one will be a batch query, denoted by the blue patterns in the figure below, which is a finite append-only stream (the snapshot of historical data in the base mview). The second one is its original input, an infinite stream, denoted by the red patterns.
-![fig2](../docs/images/mv-on-mv/mv-on-mv-02.svg)
+![fig2](../images/mv-on-mv/mv-on-mv-02.svg)
The full process of creation is:
diff --git a/docs/relational_table/storing-state-using-relational-table.md b/docs/dev/src/design/relational-table.md
similarity index 64%
rename from docs/relational_table/storing-state-using-relational-table.md
rename to docs/dev/src/design/relational-table.md
index f289b83507b16..f49907726e20e 100644
--- a/docs/relational_table/storing-state-using-relational-table.md
+++ b/docs/dev/src/design/relational-table.md
@@ -1,14 +1,6 @@
# Storing State Using Relational Table
-- [Storing State Using Relational Table](#storing-state-using-relational-table)
- - [Row-based Encoding](#row-based-encoding)
- - [Relational Table Layer](#relational-table-layer)
- - [Write Path](#write-path)
- - [Read Path](#read-path)
-
-
-
-
+
## Row-based Encoding
@@ -23,8 +15,6 @@ We implement a relational table layer as the bridge between executors and KV sta
| join | table_id \| join_key \| pk | materialized value |
| agg | table_id \| group_key | agg_value |
-For the detailed schema, please check [doc](relational-table-schema.md)
-
## Relational Table Layer
[source code](https://github.com/risingwavelabs/risingwave/blob/4e66ca3d41435c64af26b5e0003258c4f7116822/src/storage/src/table/state_table.rs)
@@ -36,13 +26,13 @@ Relational table layer consists of State Table, Mem Table and Storage Table. The
State Table provides the table operations by these APIs: `get_row`, `scan`, `insert_row`, `delete_row` and `update_row`, which are the read and write interfaces for streaming executors. The Mem Table is an in-memory buffer for caching table operations during one epoch. The Storage Table is read only, and will output the partial columns upper level needs.
-![Overview of Relational Table](../images/relational-table-layer/relational-table-01.svg)
+![Overview of Relational Table](../images/relational-table/relational-table-01.svg)
### Write Path
To write into KV state store, executors first perform operations on State Table, and these operations will be cached in Mem Table. Once a barrier flows through one executor, executor will flush the cached operations into state store. At this moment, State Table will covert these operations into kv pairs and write to state store with specific epoch.
For example, an executor performs `insert(a, b, c)` and `delete(d, e, f)` through the State Table APIs, Mem Table first caches these two operations in memory. After receiving new barrier, State Table converts these two operations into KV operations by row-based format, and writes these KV operations into state store (Hummock).
-![write example](../images/relational-table-layer/relational-table-03.svg)
+![write example](../images/relational-table/relational-table-03.svg)
### Read Path
In streaming mode, executors should be able to read the latest written data, which means uncommitted data is visible. The data in Mem Table (memory) is fresher than that in shared storage (state store). State Table provides both point-get and scan to read from state store by merging data from Mem Table and Storage Table.
#### Get
@@ -68,4 +58,36 @@ Get(pk = 3): [3, 3333, 3333]
#### Scan
Scan on relational table is implemented by `StateTableIter`, which is a merge iterator of `MemTableIter` and `StorageIter`. If a pk exists in both KV state store (shared storage) and memory (MemTable), result of `MemTableIter` is returned. For example, in the following figure, `StateTableIter` will generate `1->4->5->6` in order.
-![Scan example](../images/relational-table-layer/relational-table-02.svg)
+![Scan example](../images/relational-table/relational-table-02.svg)
+
+
+## Example: HashAgg
+
+In this doc, we will take HashAgg with extreme state (`max`, `min`) or value state (`sum`, `count`) for example, and introduce a more detailed design for the internal table schema.
+
+[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144)
+
+### Table id
+`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.
+
+### Value State (Sum, Count)
+Query example:
+```sql
+select sum(v2), count(v3) from t group by v1
+```
+
+This query will need to initiate 2 Relational Tables. The schema is `table_id/group_key`.
+
+### Extreme State (Max, Min)
+Query example:
+```sql
+select max(v2), min(v3) from t group by v1
+```
+
+This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes `table_id/group_key/sort_key/upstream_pk`.
+
+The order of `sort_key` depends on the agg call kind. For example, if it's `max()`, `sort_key` will order with `Ascending`. if it's `min()`, `sort_key` will order with `Descending`.
+The `upstream_pk` is also appended to ensure the uniqueness of the key.
+This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be `update` or `delete` operations in the stream, it's impossible to always guarantee correct results without storing all data.
+
+If `t` is created with append-only flag, the schema becomes `table_id/group_key`, which is the same for Value State. This is because in the append-only mode, there is no `update` or `delete` operation, so the cache will never miss. Therefore, we only need to write one value to the storage.
diff --git a/docs/shared-buffer.md b/docs/dev/src/design/shared-buffer.md
similarity index 95%
rename from docs/shared-buffer.md
rename to docs/dev/src/design/shared-buffer.md
index 2b63a040b4c9a..846d8bd1d064a 100644
--- a/docs/shared-buffer.md
+++ b/docs/dev/src/design/shared-buffer.md
@@ -1,14 +1,6 @@
# The Hummock Shared Buffer
-Table of contents:
-
-- [Introduction](#introduction)
-- [Part 1: Async Checkpoint](#part-1-async-checkpoint)
- - [Write Path](#write-path)
- - [Read Path](#read-path)
-- [Part 2: Write Anytime / Async Flush](#part-2-write-anytime--async-flush)
- - [A New Merge Iterator](#a-new-merge-iterator)
- - [Considerations](#considerations)
+
## Introduction
@@ -137,4 +129,4 @@ For all data a, b of the same type, we must ensure that:
```
in-memory representation of a < in-memory representation of b,
iff memcomparable(a) < memcomparable(b)
-```
\ No newline at end of file
+```
diff --git a/docs/state-store-overview.md b/docs/dev/src/design/state-store-overview.md
similarity index 92%
rename from docs/state-store-overview.md
rename to docs/dev/src/design/state-store-overview.md
index 0fc64516ac52f..a11d9528d2586 100644
--- a/docs/state-store-overview.md
+++ b/docs/dev/src/design/state-store-overview.md
@@ -1,18 +1,6 @@
# An Overview of RisingWave State Store
-- [An Overview of RisingWave State Store](#an-overview-of-risingwave-state-store)
- - [Overview](#overview)
- - [Architecture](#architecture)
- - [The Hummock User API](#the-hummock-user-api)
- - [Hummock Internals](#hummock-internals)
- - [Storage Format](#storage-format)
- - [Write Path](#write-path)
- - [Read Path](#read-path)
- - [Compaction](#compaction)
- - [Transaction Management with Hummock Manager](#transaction-management-with-hummock-manager)
- - [Checkpointing in Streaming](#checkpointing-in-streaming)
-
-
+
## Overview
@@ -22,7 +10,7 @@ In RisingWave, all streaming executors store their data into a state store. This
Reading this document requires prior knowledge of LSM-Tree-based KV storage engines, like RocksDB, LevelDB, etc.
-![Overview of Architecture](images/state-store-overview/state-store-overview-01.svg)
+![Overview of Architecture](../images/state-store-overview/state-store-overview-01.svg)
Hummock consists of a manager service on the meta node, clients on worker nodes (including compute nodes, frontend nodes, and compactor nodes), and a shared storage to store files (SSTs). Every time a new write batch is produced, the Hummock client will upload those files to shared storage, and notify the Hummock manager of the new data. With compaction going on, new files will be added and unused files will be vacuumed. The Hummock manager will take care of the lifecycle of a file — is a file being used? can we delete a file? etc.
@@ -104,7 +92,7 @@ The Hummock client will batch writes and generate SSTs to sync to the underlying
After the SST is uploaded to an S3-compatible service, the Hummock client will let the Hummock manager know there's a new table.
The list of all SSTs along with some metadata forms a ***version***. When the Hummock client adds new SSTs to the Hummock manager, a new version will be generated with the new set of SST files.
-![Write Path](images/state-store-overview/state-store-overview-02.svg)
+![Write Path](../images/state-store-overview/state-store-overview-02.svg)
### Read Path
@@ -114,7 +102,7 @@ For every read operation (`scan`, `get`), we will first select SSTs that might c
For `scan`, we simply select by overlapping key range. For point get, we will filter SSTs further by Bloom filter. After that, we will compose a single `MergeIterator` over all SSTs. The `MergeIterator` will return all keys in range along with their epochs. Then, we will create `UserIterator` over `MergeIterator`, and for all user keys, the user iterator will pick the first full key whose epoch <= read epoch. Therefore, users can perform a snapshot read from Hummock based on the given epoch. The snapshot should be acquired beforehand and released afterwards.
-![Read Path](images/state-store-overview/state-store-overview-03.svg)
+![Read Path](../images/state-store-overview/state-store-overview-03.svg)
Hummock implements the following iterators:
- `BlockIterator`: iterates a block of an SSTable.
@@ -148,7 +136,7 @@ As mentioned in [Read Path](#read-path), reads are performed on a ***version***
The SQL frontend will get the latest epoch from the meta service. Then, it will embed the epoch number into SQL plans, so that all compute nodes will read from that epoch. In theory, both SQL frontend and compute nodes will ***pin the snapshot***, to handle the case that frontend goes down and the compute nodes are still reading from Hummock (#622). However, to simplify the process, currently we ***only pin on the frontend side***.
-![Hummock Service](images/state-store-overview/state-store-overview-04.svg)
+![Hummock Service](../images/state-store-overview/state-store-overview-04.svg)
Hummock only guarantees that writes on one node can be immediately read from the same node. However, the worker nodes running batch queries might have a slightly outdated version when a batch query plan is received (due to the local version caching). Therefore, we have a `wait_epoch` interface to wait until the local cached version contains full data of one epoch.
@@ -164,7 +152,7 @@ From the perspective of the streaming executors, when they receive a barrier, th
Here we have two cases: Agg executors always persist and produce new write batches when receiving a barrier; Join executors (in the future when async flush gets implemented) will produce write batches within an epoch.
-![Checkpoint in Streaming](images/state-store-overview/state-store-overview-05.svg)
+![Checkpoint in Streaming](../images/state-store-overview/state-store-overview-05.svg)
Streaming executors cannot control when data will be persisted — they can only write to Hummock's `shared buffer`. When a barrier flows across the system and is collected by the meta service, we can ensure that all executors have written their states of ***the previous epoch*** to the shared buffer, so we can initiate checkpoint process on all worker nodes, and upload SSTs to persistent remote storage.
diff --git a/docs/streaming-overview.md b/docs/dev/src/design/streaming-overview.md
similarity index 91%
rename from docs/streaming-overview.md
rename to docs/dev/src/design/streaming-overview.md
index 2379fe2db13d3..d4a80c2a66c48 100644
--- a/docs/streaming-overview.md
+++ b/docs/dev/src/design/streaming-overview.md
@@ -1,16 +1,6 @@
# An Overview of the RisingWave Streaming Engine
-- [An Overview of the RisingWave Streaming Engine](#an-overview-of-risingwave-streaming-engine)
- - [Overview](#overview)
- - [Architecture](#architecture)
- - [Actors, executors, and states](#actors-executors-and-states)
- - [Actors](#actors)
- - [Executors](#executors)
- - [Checkpoint, Consistency, and Fault tolerance](#checkpoint-consistency-and-fault-tolerance)
- - [Barrier based checkpoint](#barrier-based-checkpoint)
- - [Fault tolerance](#fault-tolerance)
-
-
+
## Overview
@@ -26,7 +16,7 @@ In this document we give an overview of the RisingWave streaming engine.
## Architecture
-![streaming-architecture](./images/streaming-overview/streaming-architecture.svg)
+![streaming-architecture](../images/streaming-overview/streaming-architecture.svg)
The overall architecture of RisingWave is depicted in the figure above. In brief, RisingWave streaming engine consists of three sets of nodes: frontend, compute nodes, and meta service. The frontend node consists of the serving layer, handling users' SQL requests concurrently. Underlying is the processing layer. Each compute node hosts a collection of long-running actors for stream processing. All actors access a shared persistence layer of storage (currently AWS S3) as its state storage. The meta service maintains all meta-information and coordinates the whole cluster.
@@ -38,7 +28,7 @@ When receiving a create materialized view statement at the frontend, a materiali
4. Initializing the job at the backend. The meta service notifies all compute nodes to start serving streaming pipelines.
## Actors, executors, and states
-![streaming-executor](./images/streaming-overview/streaming-executor-and-compute-node.svg)
+![streaming-executor](../images/streaming-overview/streaming-executor-and-compute-node.svg)
### Actors
@@ -75,4 +65,3 @@ See more detailed descriptions on [Checkpoint](./checkpoint.md).
### Fault tolerance
When the streaming engine crashes down, the system must globally rollback to a previous consistent snapshot. To achieve this, whenever the meta detects the failover of some certain compute node or any undergoing checkpoint procedure, it triggers a recovery process. After rebuilding the streaming pipeline, each executor will reset its local state from a consistent snapshot on the storage and recover its computation.
-
diff --git a/docs/images/aggregation/agg-components.png b/docs/dev/src/images/aggregation/agg-components.png
similarity index 100%
rename from docs/images/aggregation/agg-components.png
rename to docs/dev/src/images/aggregation/agg-components.png
diff --git a/docs/images/aggregation/init-agg-group.png b/docs/dev/src/images/aggregation/init-agg-group.png
similarity index 100%
rename from docs/images/aggregation/init-agg-group.png
rename to docs/dev/src/images/aggregation/init-agg-group.png
diff --git a/docs/images/architecture-design/architecture.svg b/docs/dev/src/images/architecture-design/architecture.svg
similarity index 100%
rename from docs/images/architecture-design/architecture.svg
rename to docs/dev/src/images/architecture-design/architecture.svg
diff --git a/docs/images/architecture-design/batch-query.svg b/docs/dev/src/images/architecture-design/batch-query.svg
similarity index 100%
rename from docs/images/architecture-design/batch-query.svg
rename to docs/dev/src/images/architecture-design/batch-query.svg
diff --git a/docs/images/architecture-design/plan-fragments.svg b/docs/dev/src/images/architecture-design/plan-fragments.svg
similarity index 100%
rename from docs/images/architecture-design/plan-fragments.svg
rename to docs/dev/src/images/architecture-design/plan-fragments.svg
diff --git a/docs/images/architecture-design/stream-pipeline.png b/docs/dev/src/images/architecture-design/stream-pipeline.png
similarity index 100%
rename from docs/images/architecture-design/stream-pipeline.png
rename to docs/dev/src/images/architecture-design/stream-pipeline.png
diff --git a/docs/images/backfill/backfill-sides.png b/docs/dev/src/images/backfill/backfill-sides.png
similarity index 100%
rename from docs/images/backfill/backfill-sides.png
rename to docs/dev/src/images/backfill/backfill-sides.png
diff --git a/docs/images/backfill/handle-poll.png b/docs/dev/src/images/backfill/handle-poll.png
similarity index 100%
rename from docs/images/backfill/handle-poll.png
rename to docs/dev/src/images/backfill/handle-poll.png
diff --git a/docs/images/backfill/polling.png b/docs/dev/src/images/backfill/polling.png
similarity index 100%
rename from docs/images/backfill/polling.png
rename to docs/dev/src/images/backfill/polling.png
diff --git a/docs/images/backfill/replication-example.png b/docs/dev/src/images/backfill/replication-example.png
similarity index 100%
rename from docs/images/backfill/replication-example.png
rename to docs/dev/src/images/backfill/replication-example.png
diff --git a/docs/images/backfill/replication-replicated.png b/docs/dev/src/images/backfill/replication-replicated.png
similarity index 100%
rename from docs/images/backfill/replication-replicated.png
rename to docs/dev/src/images/backfill/replication-replicated.png
diff --git a/docs/images/backfill/replication-simple.png b/docs/dev/src/images/backfill/replication-simple.png
similarity index 100%
rename from docs/images/backfill/replication-simple.png
rename to docs/dev/src/images/backfill/replication-simple.png
diff --git a/docs/images/backfill/schema.png b/docs/dev/src/images/backfill/schema.png
similarity index 100%
rename from docs/images/backfill/schema.png
rename to docs/dev/src/images/backfill/schema.png
diff --git a/docs/images/batch-local-execution-mode/example1.svg b/docs/dev/src/images/batch-local-execution-mode/example1.svg
similarity index 100%
rename from docs/images/batch-local-execution-mode/example1.svg
rename to docs/dev/src/images/batch-local-execution-mode/example1.svg
diff --git a/docs/images/batch-local-execution-mode/example2.svg b/docs/dev/src/images/batch-local-execution-mode/example2.svg
similarity index 100%
rename from docs/images/batch-local-execution-mode/example2.svg
rename to docs/dev/src/images/batch-local-execution-mode/example2.svg
diff --git a/docs/images/batch-local-execution-mode/frontend-flow.svg b/docs/dev/src/images/batch-local-execution-mode/frontend-flow.svg
similarity index 100%
rename from docs/images/batch-local-execution-mode/frontend-flow.svg
rename to docs/dev/src/images/batch-local-execution-mode/frontend-flow.svg
diff --git a/docs/images/checkpoint/checkpoint.svg b/docs/dev/src/images/checkpoint/checkpoint.svg
similarity index 100%
rename from docs/images/checkpoint/checkpoint.svg
rename to docs/dev/src/images/checkpoint/checkpoint.svg
diff --git a/docs/images/checkpoint/shared-buffer.svg b/docs/dev/src/images/checkpoint/shared-buffer.svg
similarity index 100%
rename from docs/images/checkpoint/shared-buffer.svg
rename to docs/dev/src/images/checkpoint/shared-buffer.svg
diff --git a/docs/images/consistent-hash/actor-data.svg b/docs/dev/src/images/consistent-hash/actor-data.svg
similarity index 100%
rename from docs/images/consistent-hash/actor-data.svg
rename to docs/dev/src/images/consistent-hash/actor-data.svg
diff --git a/docs/images/consistent-hash/actor-state-table.svg b/docs/dev/src/images/consistent-hash/actor-state-table.svg
similarity index 100%
rename from docs/images/consistent-hash/actor-state-table.svg
rename to docs/dev/src/images/consistent-hash/actor-state-table.svg
diff --git a/docs/images/consistent-hash/data-distribution.svg b/docs/dev/src/images/consistent-hash/data-distribution.svg
similarity index 100%
rename from docs/images/consistent-hash/data-distribution.svg
rename to docs/dev/src/images/consistent-hash/data-distribution.svg
diff --git a/docs/images/consistent-hash/data-redistribution-1.svg b/docs/dev/src/images/consistent-hash/data-redistribution-1.svg
similarity index 100%
rename from docs/images/consistent-hash/data-redistribution-1.svg
rename to docs/dev/src/images/consistent-hash/data-redistribution-1.svg
diff --git a/docs/images/consistent-hash/data-redistribution-2.svg b/docs/dev/src/images/consistent-hash/data-redistribution-2.svg
similarity index 100%
rename from docs/images/consistent-hash/data-redistribution-2.svg
rename to docs/dev/src/images/consistent-hash/data-redistribution-2.svg
diff --git a/docs/images/consistent-hash/storage-data-layout.svg b/docs/dev/src/images/consistent-hash/storage-data-layout.svg
similarity index 100%
rename from docs/images/consistent-hash/storage-data-layout.svg
rename to docs/dev/src/images/consistent-hash/storage-data-layout.svg
diff --git a/docs/images/data-model-and-encoding/chunk.svg b/docs/dev/src/images/data-model-and-encoding/chunk.svg
similarity index 100%
rename from docs/images/data-model-and-encoding/chunk.svg
rename to docs/dev/src/images/data-model-and-encoding/chunk.svg
diff --git a/docs/images/data-model-and-encoding/row-format.svg b/docs/dev/src/images/data-model-and-encoding/row-format.svg
similarity index 100%
rename from docs/images/data-model-and-encoding/row-format.svg
rename to docs/dev/src/images/data-model-and-encoding/row-format.svg
diff --git a/docs/images/data-source/data-source-arch.svg b/docs/dev/src/images/data-source/data-source-arch.svg
similarity index 100%
rename from docs/images/data-source/data-source-arch.svg
rename to docs/dev/src/images/data-source/data-source-arch.svg
diff --git a/docs/images/logo-title.svg b/docs/dev/src/images/logo-title.svg
similarity index 100%
rename from docs/images/logo-title.svg
rename to docs/dev/src/images/logo-title.svg
diff --git a/docs/images/logo.svg b/docs/dev/src/images/logo.svg
similarity index 100%
rename from docs/images/logo.svg
rename to docs/dev/src/images/logo.svg
diff --git a/docs/images/meta-service/cluster-deployment.svg b/docs/dev/src/images/meta-service/cluster-deployment.svg
similarity index 100%
rename from docs/images/meta-service/cluster-deployment.svg
rename to docs/dev/src/images/meta-service/cluster-deployment.svg
diff --git a/docs/images/meta-service/notification.svg b/docs/dev/src/images/meta-service/notification.svg
similarity index 100%
rename from docs/images/meta-service/notification.svg
rename to docs/dev/src/images/meta-service/notification.svg
diff --git a/docs/images/mv-on-mv/mv-on-mv-01.svg b/docs/dev/src/images/mv-on-mv/mv-on-mv-01.svg
similarity index 100%
rename from docs/images/mv-on-mv/mv-on-mv-01.svg
rename to docs/dev/src/images/mv-on-mv/mv-on-mv-01.svg
diff --git a/docs/images/mv-on-mv/mv-on-mv-02.svg b/docs/dev/src/images/mv-on-mv/mv-on-mv-02.svg
similarity index 100%
rename from docs/images/mv-on-mv/mv-on-mv-02.svg
rename to docs/dev/src/images/mv-on-mv/mv-on-mv-02.svg
diff --git a/docs/images/relational-table-layer/relational-table-01.svg b/docs/dev/src/images/relational-table/relational-table-01.svg
similarity index 100%
rename from docs/images/relational-table-layer/relational-table-01.svg
rename to docs/dev/src/images/relational-table/relational-table-01.svg
diff --git a/docs/images/relational-table-layer/relational-table-02.svg b/docs/dev/src/images/relational-table/relational-table-02.svg
similarity index 100%
rename from docs/images/relational-table-layer/relational-table-02.svg
rename to docs/dev/src/images/relational-table/relational-table-02.svg
diff --git a/docs/images/relational-table-layer/relational-table-03.svg b/docs/dev/src/images/relational-table/relational-table-03.svg
similarity index 100%
rename from docs/images/relational-table-layer/relational-table-03.svg
rename to docs/dev/src/images/relational-table/relational-table-03.svg
diff --git a/docs/images/state-store-overview/state-store-overview-01.svg b/docs/dev/src/images/state-store-overview/state-store-overview-01.svg
similarity index 100%
rename from docs/images/state-store-overview/state-store-overview-01.svg
rename to docs/dev/src/images/state-store-overview/state-store-overview-01.svg
diff --git a/docs/images/state-store-overview/state-store-overview-02.svg b/docs/dev/src/images/state-store-overview/state-store-overview-02.svg
similarity index 100%
rename from docs/images/state-store-overview/state-store-overview-02.svg
rename to docs/dev/src/images/state-store-overview/state-store-overview-02.svg
diff --git a/docs/images/state-store-overview/state-store-overview-03.svg b/docs/dev/src/images/state-store-overview/state-store-overview-03.svg
similarity index 100%
rename from docs/images/state-store-overview/state-store-overview-03.svg
rename to docs/dev/src/images/state-store-overview/state-store-overview-03.svg
diff --git a/docs/images/state-store-overview/state-store-overview-04.svg b/docs/dev/src/images/state-store-overview/state-store-overview-04.svg
similarity index 100%
rename from docs/images/state-store-overview/state-store-overview-04.svg
rename to docs/dev/src/images/state-store-overview/state-store-overview-04.svg
diff --git a/docs/images/state-store-overview/state-store-overview-05.svg b/docs/dev/src/images/state-store-overview/state-store-overview-05.svg
similarity index 100%
rename from docs/images/state-store-overview/state-store-overview-05.svg
rename to docs/dev/src/images/state-store-overview/state-store-overview-05.svg
diff --git a/docs/images/streaming-overview/streaming-architecture.svg b/docs/dev/src/images/streaming-overview/streaming-architecture.svg
similarity index 100%
rename from docs/images/streaming-overview/streaming-architecture.svg
rename to docs/dev/src/images/streaming-overview/streaming-architecture.svg
diff --git a/docs/images/streaming-overview/streaming-executor-and-compute-node.svg b/docs/dev/src/images/streaming-overview/streaming-executor-and-compute-node.svg
similarity index 100%
rename from docs/images/streaming-overview/streaming-executor-and-compute-node.svg
rename to docs/dev/src/images/streaming-overview/streaming-executor-and-compute-node.svg
diff --git a/docs/dev/src/intro.md b/docs/dev/src/intro.md
index c4317c8d71dd3..011ad4a149f83 100644
--- a/docs/dev/src/intro.md
+++ b/docs/dev/src/intro.md
@@ -1,6 +1,6 @@
# Introduction
-This guide is intended to be used by contributors to learn about how to develop RisingWave. The instructions about how to submit code changes are included in [contributing guidelines](./contributing.md).
+This guide is intended to be used by contributors to learn about how to develop RisingWave. The instructions about how to submit code changes are included in [contribution guidelines](./contributing.md).
If you have questions, you can search for existing discussions or start a new discussion in the [Discussions forum of RisingWave](https://github.com/risingwavelabs/risingwave/discussions), or ask in the RisingWave Community channel on Slack. Please use the [invitation link](https://risingwave.com/slack) to join the channel.
diff --git a/docs/metrics.md b/docs/dev/src/metrics.md
similarity index 98%
rename from docs/metrics.md
rename to docs/dev/src/metrics.md
index b0216c07fc83e..14d98c7a365ea 100644
--- a/docs/metrics.md
+++ b/docs/dev/src/metrics.md
@@ -5,7 +5,7 @@ It covers what each metric measures, and what information we may derive from it.
## Barrier Latency
-Prerequisite: [Checkpoint](./checkpoint.md)
+Prerequisite: [Checkpoint](./design/checkpoint.md)
This metric measures the duration from which a barrier is injected into **all** sources in the stream graph,
to the barrier flown through all executors in the graph.
diff --git a/docs/relational_table/relational-table-schema.md b/docs/relational_table/relational-table-schema.md
deleted file mode 100644
index 64cd615feda25..0000000000000
--- a/docs/relational_table/relational-table-schema.md
+++ /dev/null
@@ -1,35 +0,0 @@
-# Relational Table Schema
-
-We introduce the rough row-based encoding format in [relational states](storing-state-using-relational-table.md#row-based-encoding)
-
-In this doc, we will take HashAgg with extreme state (`max`, `min`) or value state (`sum`, `count`) for example, and introduce a more detailed design for the internal table schema.
-
-[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144)
-
-## Table id
-`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.
-
-## Value State (Sum, Count)
-Query example:
-```sql
-select sum(v2), count(v3) from t group by v1
-```
-
-This query will need to initiate 2 Relational Tables. The schema is `table_id/group_key`.
-
-## Extreme State (Max, Min)
-Query example:
-```sql
-select max(v2), min(v3) from t group by v1
-```
-
-This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes `table_id/group_key/sort_key/upstream_pk`.
-
-The order of `sort_key` depends on the agg call kind. For example, if it's `max()`, `sort_key` will order with `Ascending`. if it's `min()`, `sort_key` will order with `Descending`.
-The `upstream_pk` is also appended to ensure the uniqueness of the key.
-This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be `update` or `delete` operations in the stream, it's impossible to always guarantee correct results without storing all data.
-
-If `t` is created with append-only flag, the schema becomes `table_id/group_key`, which is the same for Value State. This is because in the append-only mode, there is no `update` or `delete` operation, so the cache will never miss. Therefore, we only need to write one value to the storage.
-
-
-
diff --git a/docs/rustdoc/README.md b/docs/rustdoc/README.md
index 1b3e70e1113c2..0adf956748290 100644
--- a/docs/rustdoc/README.md
+++ b/docs/rustdoc/README.md
@@ -1,6 +1,6 @@
This folder contains files for generating a nice rustdoc index page.
-Online version (for latest main):
+Online version (for latest main):
To build and open locally, run the following command in the project root:
diff --git a/docs/rustdoc/index.md b/docs/rustdoc/index.md
index cfb74b8055b8a..a76edb23cb2b4 100644
--- a/docs/rustdoc/index.md
+++ b/docs/rustdoc/index.md
@@ -4,9 +4,7 @@ Welcome to an overview of the developer documentations of RisingWave!
## Developer Docs
-To learn how to develop RisingWave, see the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/).
-
-The [design docs](https://github.com/risingwavelabs/risingwave/blob/main/docs/README.md) covers some high-level ideas of how we built RisingWave.
+To learn how to develop RisingWave, and access high-level design docs, see the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/).
## Crate Docs
diff --git a/docs/rustdoc/rust.css b/docs/rustdoc/rust.css
index 71cf5e3df0004..9c76bb08c3898 100644
--- a/docs/rustdoc/rust.css
+++ b/docs/rustdoc/rust.css
@@ -1,18 +1,21 @@
/* This file is copied from the Rust Project, which is dual-licensed under
-Apache 2.0 and MIT terms. */
+Apache 2.0 and MIT terms. https: //github.com/rust-lang/rust/blob/7d640b670e521a0491ea1e49082d1cb5632e2562/src/doc/rust.css
+*/
/* General structure */
body {
+ font-family: serif;
margin: 0 auto;
padding: 0 15px;
font-size: 18px;
- color: #333;
+ color: #000;
line-height: 1.428571429;
-webkit-box-sizing: unset;
-moz-box-sizing: unset;
box-sizing: unset;
+ background: #fff;
}
@media (min-width: 768px) {
body {
@@ -20,6 +23,14 @@ body {
}
}
+h1,
+h2,
+h3,
+h4,
+h5,
+h6 {
+ font-family: sans-serif;
+}
h2, h3, h4, h5, h6 {
font-weight: 400;
line-height: 1.1;
@@ -37,8 +48,8 @@ h4, h5, h6 {
margin-bottom: 10px;
padding: 5px 10px;
}
-h5, h6 {
- color: black;
+h5,
+h6 {
text-decoration: underline;
}
@@ -135,6 +146,31 @@ h1 a:link, h1 a:visited, h2 a:link, h2 a:visited,
h3 a:link, h3 a:visited, h4 a:link, h4 a:visited,
h5 a:link, h5 a:visited {color: black;}
+h1,
+h2,
+h3,
+h4,
+h5 {
+ /* This is needed to be able to position the doc-anchor. Ideally there
+ would be a
around the whole document, but we don't have that. */
+ position: relative;
+}
+
+a.doc-anchor {
+ color: black;
+ display: none;
+ position: absolute;
+ left: -20px;
+ /* We add this padding so that when the cursor moves from the heading's text to the anchor,
+ the anchor doesn't disappear. */
+ padding-right: 5px;
+ /* And this padding is used to make the anchor larger and easier to click on. */
+ padding-left: 3px;
+}
+
+*:hover>.doc-anchor {
+ display: block;
+}
/* Code */
pre, code {
diff --git a/e2e_test/commands/sr_register b/e2e_test/commands/sr_register
new file mode 100755
index 0000000000000..57dc65e50610d
--- /dev/null
+++ b/e2e_test/commands/sr_register
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+# Register a schema to schema registry
+#
+# Usage: sr_register
+#
+# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions
+
+# Validate arguments
+if [[ $# -ne 2 ]]; then
+ echo "Usage: sr_register "
+ exit 1
+fi
+
+subject="$1"
+schema="$2"
+
+
+if [[ -z $subject || -z $schema ]]; then
+ echo "Error: Arguments cannot be empty"
+ exit 1
+fi
+
+echo "$schema" | jq '{"schema": tojson}' \
+| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/${subject}/versions"
diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt
index 446fc6196d32b..57677af57cd92 100644
--- a/e2e_test/source_inline/kafka/avro/alter_source.slt
+++ b/e2e_test/source_inline/kafka/avro/alter_source.slt
@@ -12,8 +12,7 @@ system ok
rpk topic create 'avro_alter_source_test'
system ok
-echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
-| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions"
+sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'
statement ok
create source s
@@ -27,8 +26,7 @@ FORMAT PLAIN ENCODE AVRO (
# create a new version of schema and produce a message
system ok
-echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
-| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions"
+sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'
system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test
diff --git a/e2e_test/source_inline/kafka/avro/union.slt b/e2e_test/source_inline/kafka/avro/union.slt
new file mode 100644
index 0000000000000..44e1db659d120
--- /dev/null
+++ b/e2e_test/source_inline/kafka/avro/union.slt
@@ -0,0 +1,175 @@
+control substitution on
+
+system ok
+rpk topic delete 'avro-union' || true; \
+(rpk sr subject delete 'avro-union-value' && rpk sr subject delete 'avro-union-value' --permanent) || true;
+rpk topic create avro-union
+
+system ok
+sr_register avro-union-value '
+{
+ "type": "record",
+ "name": "Root",
+ "fields": [
+ {
+ "name": "unionType",
+ "type": ["int", "string"]
+ },
+ {
+ "name": "unionTypeComplex",
+ "type": [
+ "null",
+ {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
+ {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
+ {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
+ ]
+ },
+ {
+ "name": "enumField",
+ "type": ["null", "int", {
+ "type": "enum",
+ "name": "myEnum",
+ "namespace": "my.namespace",
+ "symbols": ["A", "B", "C", "D"]
+ }],
+ "default": null
+ }
+ ]
+}
+'
+
+system ok
+cat<risingwave-sink-deltalake
test
-
- com.risingwave
- risingwave-sink-iceberg
- test
- com.risingwaves3-common
diff --git a/java/connector-node/risingwave-sink-iceberg/pom.xml b/java/connector-node/risingwave-sink-iceberg/pom.xml
index d8ef3d6db384a..404a895e7506f 100644
--- a/java/connector-node/risingwave-sink-iceberg/pom.xml
+++ b/java/connector-node/risingwave-sink-iceberg/pom.xml
@@ -74,6 +74,10 @@
org.apache.hivehive-metastore
+
+ org.apache.hive
+ hive-exec
+ org.apache.parquetparquet-avro
diff --git a/java/pom.xml b/java/pom.xml
index 644588c9d6b44..2afd5606a39f0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -387,6 +387,11 @@
hive-metastore${hive.version}
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ org.apache.hadoophadoop-mapreduce-client-core
diff --git a/proto/expr.proto b/proto/expr.proto
index f4016c3912db4..dedfa3f3cd3b7 100644
--- a/proto/expr.proto
+++ b/proto/expr.proto
@@ -307,6 +307,8 @@ message ExprNode {
HAS_TABLE_PRIVILEGE = 2407;
HAS_ANY_COLUMN_PRIVILEGE = 2408;
HAS_SCHEMA_PRIVILEGE = 2409;
+ PG_IS_IN_RECOVERY = 2411;
+ RW_RECOVERY_STATUS = 2412;
// EXTERNAL
ICEBERG_TRANSFORM = 2201;
diff --git a/proto/meta.proto b/proto/meta.proto
index 0d2a1b8832915..3284887590182 100644
--- a/proto/meta.proto
+++ b/proto/meta.proto
@@ -362,12 +362,26 @@ message ListAllNodesResponse {
repeated common.WorkerNode nodes = 2;
}
+message GetClusterRecoveryStatusRequest {}
+
+enum RecoveryStatus {
+ STATUS_UNSPECIFIED = 0;
+ STATUS_STARTING = 1;
+ STATUS_RECOVERING = 2;
+ STATUS_RUNNING = 3;
+}
+
+message GetClusterRecoveryStatusResponse {
+ RecoveryStatus status = 1;
+}
+
service ClusterService {
rpc AddWorkerNode(AddWorkerNodeRequest) returns (AddWorkerNodeResponse);
rpc ActivateWorkerNode(ActivateWorkerNodeRequest) returns (ActivateWorkerNodeResponse);
rpc DeleteWorkerNode(DeleteWorkerNodeRequest) returns (DeleteWorkerNodeResponse);
rpc UpdateWorkerNodeSchedulability(UpdateWorkerNodeSchedulabilityRequest) returns (UpdateWorkerNodeSchedulabilityResponse);
rpc ListAllNodes(ListAllNodesRequest) returns (ListAllNodesResponse);
+ rpc GetClusterRecoveryStatus(GetClusterRecoveryStatusRequest) returns (GetClusterRecoveryStatusResponse);
}
enum SubscribeType {
diff --git a/proto/stream_service.proto b/proto/stream_service.proto
index 85b12d8ed5fa1..fd97bde853487 100644
--- a/proto/stream_service.proto
+++ b/proto/stream_service.proto
@@ -71,7 +71,8 @@ message BarrierCompleteResponse {
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
- uint64 compaction_group_id = 1;
+ reserved 1;
+ reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map table_stats_map = 3;
}
diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml
index c3042346aff85..ec7091ea882c4 100644
--- a/src/batch/Cargo.toml
+++ b/src/batch/Cargo.toml
@@ -32,7 +32,7 @@ hytra = "0.1.2"
iceberg = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
-opendal = "0.47"
+opendal = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs
index 0e711458d5196..9017f58606f7c 100644
--- a/src/cmd/src/lib.rs
+++ b/src/cmd/src/lib.rs
@@ -44,8 +44,7 @@ pub fn compute(opts: ComputeNodeOpts) -> ! {
pub fn meta(opts: MetaNodeOpts) -> ! {
init_risingwave_logger(LoggerSettings::from_opts(&opts));
- // TODO(shutdown): pass the shutdown token
- main_okk(|_| risingwave_meta_node::start(opts));
+ main_okk(|shutdown| risingwave_meta_node::start(opts, shutdown));
}
pub fn frontend(opts: FrontendOpts) -> ! {
@@ -55,8 +54,7 @@ pub fn frontend(opts: FrontendOpts) -> ! {
pub fn compactor(opts: CompactorOpts) -> ! {
init_risingwave_logger(LoggerSettings::from_opts(&opts));
- // TODO(shutdown): pass the shutdown token
- main_okk(|_| risingwave_compactor::start(opts));
+ main_okk(|shutdown| risingwave_compactor::start(opts, shutdown));
}
pub fn ctl(opts: CtlOpts) -> ! {
diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs
index 791921fc07d69..325f2f8ff395b 100644
--- a/src/cmd_all/src/standalone.rs
+++ b/src/cmd_all/src/standalone.rs
@@ -194,9 +194,10 @@ pub async fn standalone(
is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
tracing::info!("starting meta-node thread with cli args: {:?}", opts);
+ let shutdown = shutdown.clone();
let _meta_handle = tokio::spawn(async move {
let dangerous_max_idle_secs = opts.dangerous_max_idle_secs;
- risingwave_meta_node::start(opts).await;
+ risingwave_meta_node::start(opts, shutdown).await;
tracing::warn!("meta is stopped, shutdown all nodes");
if let Some(idle_exit_secs) = dangerous_max_idle_secs {
eprintln!("{}",
@@ -230,8 +231,9 @@ pub async fn standalone(
}
if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
+ let shutdown = shutdown.clone();
let _compactor_handle =
- tokio::spawn(async move { risingwave_compactor::start(opts).await });
+ tokio::spawn(async move { risingwave_compactor::start(opts, shutdown).await });
}
// wait for log messages to be flushed
diff --git a/src/common/common_service/src/lib.rs b/src/common/common_service/src/lib.rs
index c09c84012819b..2cf9a56e076f3 100644
--- a/src/common/common_service/src/lib.rs
+++ b/src/common/common_service/src/lib.rs
@@ -18,9 +18,13 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]
-pub mod metrics_manager;
-pub mod observer_manager;
+mod metrics_manager;
+mod observer_manager;
+mod tracing;
pub use metrics_manager::MetricsManager;
-
-pub mod tracing;
+pub use observer_manager::{
+ Channel, NotificationClient, ObserverError, ObserverManager, ObserverState,
+ RpcNotificationClient,
+};
+pub use tracing::TracingExtractLayer;
diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs
index e760a0e16866c..bf7e457be8b1c 100644
--- a/src/common/common_service/src/observer_manager.rs
+++ b/src/common/common_service/src/observer_manager.rs
@@ -22,42 +22,6 @@ use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tonic::{Status, Streaming};
-pub trait SubscribeTypeEnum {
- fn subscribe_type() -> SubscribeType;
-}
-
-pub struct SubscribeFrontend {}
-
-impl SubscribeTypeEnum for SubscribeFrontend {
- fn subscribe_type() -> SubscribeType {
- SubscribeType::Frontend
- }
-}
-
-pub struct SubscribeHummock {}
-
-impl SubscribeTypeEnum for SubscribeHummock {
- fn subscribe_type() -> SubscribeType {
- SubscribeType::Hummock
- }
-}
-
-pub struct SubscribeCompactor {}
-
-impl SubscribeTypeEnum for SubscribeCompactor {
- fn subscribe_type() -> SubscribeType {
- SubscribeType::Compactor
- }
-}
-
-pub struct SubscribeCompute {}
-
-impl SubscribeTypeEnum for SubscribeCompute {
- fn subscribe_type() -> SubscribeType {
- SubscribeType::Compute
- }
-}
-
/// `ObserverManager` is used to update data based on notification from meta.
/// Call `start` to spawn a new asynchronous task
/// We can write the notification logic by implementing `ObserverNodeImpl`.
@@ -68,7 +32,7 @@ pub struct ObserverManager {
}
pub trait ObserverState: Send + 'static {
- type SubscribeType: SubscribeTypeEnum;
+ fn subscribe_type() -> SubscribeType;
/// modify data after receiving notification from meta
fn handle_notification(&mut self, resp: SubscribeResponse);
@@ -109,10 +73,7 @@ where
S: ObserverState,
{
pub async fn new(client: T, observer_states: S) -> Self {
- let rx = client
- .subscribe(S::SubscribeType::subscribe_type())
- .await
- .unwrap();
+ let rx = client.subscribe(S::subscribe_type()).await.unwrap();
Self {
rx,
client,
@@ -214,11 +175,7 @@ where
/// `re_subscribe` is used to re-subscribe to the meta's notification.
async fn re_subscribe(&mut self) {
loop {
- match self
- .client
- .subscribe(S::SubscribeType::subscribe_type())
- .await
- {
+ match self.client.subscribe(S::subscribe_type()).await {
Ok(rx) => {
tracing::debug!("re-subscribe success");
self.rx = rx;
diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs
index 295fb6399ba4b..e5774a3f16d7d 100644
--- a/src/common/metrics/src/monitor/connection.rs
+++ b/src/common/metrics/src/monitor/connection.rs
@@ -587,28 +587,6 @@ impl tonic::transport::server::Router {
}
}
-#[cfg(not(madsim))]
-pub fn monitored_tcp_incoming(
- listen_addr: std::net::SocketAddr,
- connection_type: impl Into,
- config: TcpConfig,
-) -> Result<
- MonitoredConnection,
- Box,
-> {
- let incoming = tonic::transport::server::TcpIncoming::new(
- listen_addr,
- config.tcp_nodelay,
- config.keepalive_duration,
- )?;
- Ok(MonitoredConnection::new(
- incoming,
- MonitorNewConnectionImpl {
- connection_type: connection_type.into(),
- },
- ))
-}
-
#[derive(Clone)]
pub struct MonitorNewConnectionImpl {
connection_type: String,
diff --git a/src/common/metrics/src/monitor/mod.rs b/src/common/metrics/src/monitor/mod.rs
index 10b5c966e636a..316cac9ea907c 100644
--- a/src/common/metrics/src/monitor/mod.rs
+++ b/src/common/metrics/src/monitor/mod.rs
@@ -12,20 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-pub mod connection;
-pub mod my_stats;
-pub mod process;
-pub mod rwlock;
+pub use connection::{monitor_connector, EndpointExt, RouterExt, TcpConfig};
+pub use rwlock::MonitoredRwLock;
-use std::sync::LazyLock;
+mod connection;
+mod process;
+mod rwlock;
-use prometheus::core::{
- AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, Metric,
-};
-use prometheus::{Histogram, HistogramVec, Registry};
+use std::sync::LazyLock;
-use crate::monitor::my_stats::MyHistogram;
-use crate::monitor::process::monitor_process;
+use prometheus::Registry;
#[cfg(target_os = "linux")]
static PAGESIZE: std::sync::LazyLock =
@@ -35,59 +31,8 @@ static PAGESIZE: std::sync::LazyLock =
pub static CLOCK_TICK: std::sync::LazyLock =
std::sync::LazyLock::new(|| unsafe { libc::sysconf(libc::_SC_CLK_TCK) as u64 });
-/// Define extension method `print` used in `print_statistics`.
-pub trait Print {
- fn print(&self);
-}
-
-impl Print for GenericCounter {
- fn print(&self) {
- let desc = &self.desc()[0].fq_name;
- let counter = self.metric().get_counter().get_value() as u64;
- println!("{desc} COUNT : {counter}");
- }
-}
-
-impl Print for GenericGauge {
- fn print(&self) {
- let desc = &self.desc()[0].fq_name;
- let counter = self.get();
- println!("{desc} COUNT : {counter}");
- }
-}
-
-impl Print for Histogram {
- fn print(&self) {
- let desc = &self.desc()[0].fq_name;
-
- let histogram = MyHistogram::from_prom_hist(self.metric().get_histogram());
- let p50 = histogram.get_percentile(50.0);
- let p95 = histogram.get_percentile(95.0);
- let p99 = histogram.get_percentile(99.0);
- let p100 = histogram.get_percentile(100.0);
-
- let sample_count = self.get_sample_count();
- let sample_sum = self.get_sample_sum();
- println!("{desc} P50 : {p50} P95 : {p95} P99 : {p99} P100 : {p100} COUNT : {sample_count} SUM : {sample_sum}");
- }
-}
-
-impl Print for HistogramVec {
- fn print(&self) {
- let desc = &self.desc()[0].fq_name;
- println!("{desc} {:?}", self);
- }
-}
-
-impl Print for GenericCounterVec {
- fn print(&self) {
- let desc = &self.desc()[0].fq_name;
- println!("{desc} {:?}", self);
- }
-}
-
pub static GLOBAL_METRICS_REGISTRY: LazyLock = LazyLock::new(|| {
let registry = Registry::new();
- monitor_process(®istry);
+ process::monitor_process(®istry);
registry
});
diff --git a/src/common/metrics/src/monitor/my_stats.rs b/src/common/metrics/src/monitor/my_stats.rs
deleted file mode 100644
index 52c71167f2f97..0000000000000
--- a/src/common/metrics/src/monitor/my_stats.rs
+++ /dev/null
@@ -1,221 +0,0 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::fmt::{Display, Formatter};
-
-use itertools::Itertools;
-use prometheus::proto::Histogram;
-use rw_iter_util::ZipEqFast;
-
-#[derive(Clone, Default, Debug)]
-pub struct MyHistogram {
- pub upper_bound_list: Vec,
- pub count_list: Vec,
- pub total_count: u64,
- pub total_sum: f64,
-}
-
-impl MyHistogram {
- pub fn from_prom_hist(histogram: &Histogram) -> MyHistogram {
- let mut upper_bound_list = Vec::new();
- let mut count_list = Vec::new();
-
- let total_count = histogram.get_sample_count();
- let total_sum = histogram.get_sample_sum();
-
- let buckets = histogram.get_bucket();
- for bucket in buckets {
- let upper_bound = bucket.get_upper_bound();
- let count = bucket.get_cumulative_count();
- upper_bound_list.push(upper_bound);
- count_list.push(count);
- }
-
- MyHistogram {
- upper_bound_list,
- count_list,
- total_count,
- total_sum,
- }
- }
-
- pub fn from_diff(prev: &MyHistogram, cur: &MyHistogram) -> MyHistogram {
- MyHistogram {
- upper_bound_list: cur.upper_bound_list.clone(),
- count_list: match prev.count_list.is_empty() {
- true => cur.count_list.clone(),
- false => prev
- .count_list
- .iter()
- .zip_eq_fast(cur.count_list.iter())
- .map(|(&pb, &cb)| cb - pb)
- .collect_vec(),
- },
- total_sum: cur.total_sum - prev.total_sum,
- total_count: cur.total_count - prev.total_count,
- }
- }
-
- pub fn get_percentile(&self, p: f64) -> f64 {
- let sample_count = self.total_count;
-
- // empty bucket may appear
- if sample_count == 0 {
- return 0.0;
- }
- let threshold = (sample_count as f64 * (p / 100.0_f64)).ceil() as u64;
- let mut last_upper_bound = 0.0;
- let mut last_count = 0;
- for (&upper_bound, &count) in self
- .upper_bound_list
- .iter()
- .zip_eq_fast(self.count_list.iter())
- {
- if count >= threshold {
- // assume scale linearly within this bucket,
- // return a value between last_upper_bound and upper_bound
- let right_left_diff = upper_bound - last_upper_bound;
- return last_upper_bound
- + right_left_diff * (threshold - last_count) as f64
- / (count - last_count) as f64;
- }
- last_upper_bound = upper_bound;
- last_count = count;
- }
-
- 0.0
- }
-}
-
-impl Display for MyHistogram {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- // calculate latencies statistics
- let mean = self.total_sum / self.total_count as f64;
- let p50 = self.get_percentile(50.0);
- let p90 = self.get_percentile(90.0);
- let p99 = self.get_percentile(99.0);
- let p100 = self.get_percentile(100.0);
-
- write!(
- f,
- "latency:
- mean: {},
- p50: {},
- p90: {},
- p99: {},
- p100: {};",
- mean, p50, p90, p99, p100
- )
- }
-}
-
-#[cfg(test)]
-mod tests {
- use prometheus::core::Metric;
- use prometheus::{histogram_opts, register_histogram_with_registry, Registry};
-
- use super::*;
-
- #[test]
- fn test_proc_histogram_basic() {
- fn new_simple_histogram(upper_bound: u64) -> MyHistogram {
- let registry = Registry::new();
- let buckets = (1..=upper_bound).map(|x| x as f64).collect::>();
- let opts = histogram_opts!("test_histogram", "test_histogram", buckets);
-
- let histogram = register_histogram_with_registry!(opts, registry).unwrap();
-
- for value in 1..=upper_bound {
- histogram.observe(value as f64);
- }
-
- MyHistogram::from_prom_hist(histogram.metric().get_histogram())
- }
-
- let histogram = new_simple_histogram(999);
- assert_eq!(histogram.get_percentile(50.0) as u64, 500);
- assert_eq!(histogram.get_percentile(90.0) as u64, 900);
- assert_eq!(histogram.get_percentile(99.0) as u64, 990);
- assert_eq!(histogram.get_percentile(99.9) as u64, 999);
- assert_eq!(histogram.get_percentile(100.0) as u64, 999);
-
- let histogram = new_simple_histogram(1000);
- assert_eq!(histogram.get_percentile(50.0) as u64, 500);
- assert_eq!(histogram.get_percentile(90.0) as u64, 900);
- assert_eq!(histogram.get_percentile(99.0) as u64, 990);
- assert_eq!(histogram.get_percentile(99.9) as u64, 1000);
- assert_eq!(histogram.get_percentile(100.0) as u64, 1000);
-
- let histogram = new_simple_histogram(9999);
- assert_eq!(histogram.get_percentile(50.0) as u64, 5000);
- assert_eq!(histogram.get_percentile(90.0) as u64, 9000);
- assert_eq!(histogram.get_percentile(99.0) as u64, 9900);
- assert_eq!(histogram.get_percentile(99.9) as u64, 9990);
- assert_eq!(histogram.get_percentile(100.0) as u64, 9999);
- }
-
- #[test]
- fn test_proc_histogram_uneven_distributed() {
- let registry = Registry::new();
- let buckets = vec![
- 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
- ];
- let opts = histogram_opts!("test_histogram", "test_histogram", buckets);
- let histogram = register_histogram_with_registry!(opts, registry).unwrap();
-
- let mut i = 0.005;
- while i < 10.0 {
- histogram.observe(i);
- i += 0.005;
- }
-
- let histogram = MyHistogram::from_prom_hist(histogram.metric().get_histogram());
- assert_eq!(histogram.get_percentile(50.0), 5.0);
- assert_eq!(histogram.get_percentile(90.0), 9.004004004004004);
- assert_eq!(histogram.get_percentile(99.0), 9.904904904904905);
- assert_eq!(histogram.get_percentile(99.9), 9.994994994994995);
- assert_eq!(histogram.get_percentile(100.0), 10.0);
- }
-
- #[test]
- fn test_proc_histogram_realistic() {
- let registry = Registry::new();
- let buckets = vec![
- 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
- ];
- let opts = histogram_opts!("test_histogram", "test_histogram", buckets);
- let histogram = register_histogram_with_registry!(opts, registry).unwrap();
-
- histogram.observe(0.0012);
- histogram.observe(0.0013);
- histogram.observe(0.003);
-
- histogram.observe(0.0132);
- histogram.observe(0.0143);
- histogram.observe(0.0146);
- histogram.observe(0.0249);
-
- histogram.observe(0.99);
-
- histogram.observe(6.11);
- histogram.observe(7.833);
-
- let histogram = MyHistogram::from_prom_hist(histogram.metric().get_histogram());
- assert_eq!(histogram.get_percentile(50.0), 0.0175);
- assert_eq!(histogram.get_percentile(90.0), 7.5);
- assert_eq!(histogram.get_percentile(99.0), 10.00);
- assert_eq!(histogram.get_percentile(99.9), 10.00);
- assert_eq!(histogram.get_percentile(100.0), 10.00);
- }
-}
diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs
index 917d484458518..cfee990561aa1 100644
--- a/src/common/src/array/stream_chunk.rs
+++ b/src/common/src/array/stream_chunk.rs
@@ -525,6 +525,10 @@ impl OpRowMutRef<'_> {
}
impl StreamChunkMut {
+ pub fn capacity(&self) -> usize {
+ self.vis.len()
+ }
+
pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}
diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs
index 69e727cbde655..3b02b8c38d020 100644
--- a/src/common/src/types/mod.rs
+++ b/src/common/src/types/mod.rs
@@ -573,6 +573,8 @@ pub trait ScalarRef<'a>: ScalarBounds> + 'a + Copy {
macro_rules! scalar_impl_enum {
($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => {
/// `ScalarImpl` embeds all possible scalars in the evaluation framework.
+ ///
+ /// See `for_all_variants` for the definition.
#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
pub enum ScalarImpl {
$( $variant_name($scalar) ),*
diff --git a/src/compute/src/observer/observer_manager.rs b/src/compute/src/observer/observer_manager.rs
index e4192fb0b6f9c..62e2d699668f4 100644
--- a/src/compute/src/observer/observer_manager.rs
+++ b/src/compute/src/observer/observer_manager.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
-use risingwave_common_service::observer_manager::{ObserverState, SubscribeCompute};
+use risingwave_common_service::ObserverState;
use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::SubscribeResponse;
@@ -22,7 +22,9 @@ pub struct ComputeObserverNode {
}
impl ObserverState for ComputeObserverNode {
- type SubscribeType = SubscribeCompute;
+ fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
+ risingwave_pb::meta::SubscribeType::Compute
+ }
fn handle_notification(&mut self, resp: SubscribeResponse) {
let Some(info) = resp.info.as_ref() else {
diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs
index 3b0466b6cc0d1..d7dcbd5146c31 100644
--- a/src/compute/src/server.rs
+++ b/src/compute/src/server.rs
@@ -28,7 +28,7 @@ use risingwave_common::config::{
MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE,
};
use risingwave_common::lru::init_global_sequencer_args;
-use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
+use risingwave_common::monitor::{RouterExt, TcpConfig};
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
@@ -38,9 +38,7 @@ use risingwave_common::util::pretty_bytes::convert;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
-use risingwave_common_service::metrics_manager::MetricsManager;
-use risingwave_common_service::observer_manager::ObserverManager;
-use risingwave_common_service::tracing::TracingExtractLayer;
+use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::common::WorkerType;
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index 86491ae464a3f..34f8761073454 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -84,7 +84,7 @@ mysql_common = { version = "0.32", default-features = false, features = [
] }
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
-opendal = { version = "0.47", features = [
+opendal = { workspace = true, features = [
"executors-tokio",
"services-fs",
"services-gcs",
diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs
index cdd9aea416c8f..2cd6cf5ac77c3 100644
--- a/src/connector/codec/src/decoder/avro/mod.rs
+++ b/src/connector/codec/src/decoder/avro/mod.rs
@@ -15,7 +15,7 @@
mod schema;
use std::sync::LazyLock;
-use apache_avro::schema::{DecimalSchema, RecordSchema};
+use apache_avro::schema::{DecimalSchema, RecordSchema, UnionSchema};
use apache_avro::types::{Value, ValueKind};
use apache_avro::{Decimal as AvroDecimal, Schema};
use chrono::Datelike;
@@ -33,6 +33,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
pub use self::schema::{avro_schema_to_column_descs, MapHandling, ResolvedAvroSchema};
use super::utils::extract_decimal;
use super::{bail_uncategorized, uncategorized, Access, AccessError, AccessResult};
+use crate::decoder::avro::schema::avro_schema_to_struct_field_name;
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
@@ -107,6 +108,54 @@ impl<'a> AvroParseOptions<'a> {
let v: ScalarImpl = match (type_expected, value) {
(_, Value::Null) => return Ok(DatumCow::NULL),
+ // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) -----
+ (DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
+ let Some(Schema::Union(u)) = self.schema else {
+ // XXX: Is this branch actually unreachable? (if self.schema is correctly used)
+ return Err(create_error());
+ };
+
+ if let Some(inner) = get_nullable_union_inner(u) {
+ // nullable Union ([null, record])
+ return Self {
+ schema: Some(inner),
+ relax_numeric: self.relax_numeric,
+ }
+ .convert_to_datum(v, type_expected);
+ }
+ let variant_schema = &u.variants()[*variant as usize];
+
+ if matches!(variant_schema, &Schema::Null) {
+ return Ok(DatumCow::NULL);
+ }
+
+ // Here we compare the field name, instead of using the variant idx to find the field idx.
+ // The latter approach might also work, but might be more error-prone.
+ // We will need to get the index of the "null" variant, and then re-map the variant index to the field index.
+ // XXX: probably we can unwrap here (if self.schema is correctly used)
+ let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;
+
+ let mut fields = Vec::with_capacity(struct_type_info.len());
+ for (field_name, field_type) in struct_type_info
+ .names()
+ .zip_eq_fast(struct_type_info.types())
+ {
+ if field_name == expected_field_name {
+ let datum = Self {
+ schema: Some(variant_schema),
+ relax_numeric: self.relax_numeric,
+ }
+ .convert_to_datum(v, field_type)?
+ .to_owned_datum();
+
+ fields.push(datum)
+ } else {
+ fields.push(None)
+ }
+ }
+ StructValue::new(fields).into()
+ }
+ // nullable Union ([null, T])
(_, Value::Union(_, v)) => {
let schema = self.extract_inner_schema(None);
return Self {
@@ -290,6 +339,12 @@ impl Access for AvroAccess<'_> {
let mut value = self.value;
let mut options: AvroParseOptions<'_> = self.options.clone();
+ debug_assert!(
+ path.len() == 1
+ || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")),
+ "unexpected path access: {:?}",
+ path
+ );
let mut i = 0;
while i < path.len() {
let key = path[i];
@@ -299,6 +354,29 @@ impl Access for AvroAccess<'_> {
};
match value {
Value::Union(_, v) => {
+ // The debezium "before" field is a nullable union.
+ // "fields": [
+ // {
+ // "name": "before",
+ // "type": [
+ // "null",
+ // {
+ // "type": "record",
+ // "name": "Value",
+ // "fields": [...],
+ // }
+ // ],
+ // "default": null
+ // },
+ // {
+ // "name": "after",
+ // "type": [
+ // "null",
+ // "Value"
+ // ],
+ // "default": null
+ // },
+ // ...]
value = v;
options.schema = options.extract_inner_schema(None);
continue;
@@ -338,18 +416,30 @@ pub(crate) fn avro_decimal_to_rust_decimal(
))
}
-pub fn avro_schema_skip_union(schema: &Schema) -> anyhow::Result<&Schema> {
+/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`.
+fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
+ let variants = union_schema.variants();
+ // Note: `[null, null] is invalid`, we don't need to worry about that.
+ if variants.len() == 2 && variants.contains(&Schema::Null) {
+ let inner_schema = variants
+ .iter()
+ .find(|s| !matches!(s, &&Schema::Null))
+ .unwrap();
+ Some(inner_schema)
+ } else {
+ None
+ }
+}
+
+pub fn avro_schema_skip_nullable_union(schema: &Schema) -> anyhow::Result<&Schema> {
match schema {
- Schema::Union(union_schema) => {
- let inner_schema = union_schema
- .variants()
- .iter()
- .find(|s| !matches!(s, &&Schema::Null))
- .ok_or_else(|| {
- anyhow::format_err!("illegal avro record schema {:?}", union_schema)
- })?;
- Ok(inner_schema)
- }
+ Schema::Union(union_schema) => match get_nullable_union_inner(union_schema) {
+ Some(s) => Ok(s),
+ None => Err(anyhow::format_err!(
+ "illegal avro union schema, expected [null, T], got {:?}",
+ union_schema
+ )),
+ },
other => Ok(other),
}
}
@@ -372,7 +462,9 @@ pub fn avro_extract_field_schema<'a>(
Ok(&field.schema)
}
Schema::Array(schema) => Ok(schema),
- Schema::Union(_) => avro_schema_skip_union(schema),
+ // Only nullable union should be handled here.
+ // We will not extract inner schema for real union (and it's not extractable).
+ Schema::Union(_) => avro_schema_skip_nullable_union(schema),
Schema::Map(schema) => Ok(schema),
_ => bail!("avro schema does not have inner item, schema: {:?}", schema),
}
@@ -476,11 +568,337 @@ pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> Acce
mod tests {
use std::str::FromStr;
- use apache_avro::Decimal as AvroDecimal;
+ use apache_avro::{from_avro_datum, Decimal as AvroDecimal};
+ use expect_test::expect;
use risingwave_common::types::{Datum, Decimal};
use super::*;
+ /// Test the behavior of the Rust Avro lib for handling union with logical type.
+ #[test]
+ fn test_avro_lib_union() {
+ // duplicate types
+ let s = Schema::parse_str(r#"["null", "null"]"#);
+ expect![[r#"
+ Err(
+ Unions cannot contain duplicate types,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ let s = Schema::parse_str(r#"["int", "int"]"#);
+ expect![[r#"
+ Err(
+ Unions cannot contain duplicate types,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ // multiple map/array are considered as the same type, regardless of the element type!
+ let s = Schema::parse_str(
+ r#"[
+"null",
+{
+ "type": "map",
+ "values" : "long",
+ "default": {}
+},
+{
+ "type": "map",
+ "values" : "int",
+ "default": {}
+}
+]
+"#,
+ );
+ expect![[r#"
+ Err(
+ Unions cannot contain duplicate types,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ let s = Schema::parse_str(
+ r#"[
+"null",
+{
+ "type": "array",
+ "items" : "long",
+ "default": {}
+},
+{
+ "type": "array",
+ "items" : "int",
+ "default": {}
+}
+]
+"#,
+ );
+ expect![[r#"
+ Err(
+ Unions cannot contain duplicate types,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ // multiple named types
+ let s = Schema::parse_str(
+ r#"[
+"null",
+{"type":"fixed","name":"a","size":16},
+{"type":"fixed","name":"b","size":32}
+]
+"#,
+ );
+ expect![[r#"
+ Ok(
+ Union(
+ UnionSchema {
+ schemas: [
+ Null,
+ Fixed(
+ FixedSchema {
+ name: Name {
+ name: "a",
+ namespace: None,
+ },
+ aliases: None,
+ doc: None,
+ size: 16,
+ attributes: {},
+ },
+ ),
+ Fixed(
+ FixedSchema {
+ name: Name {
+ name: "b",
+ namespace: None,
+ },
+ aliases: None,
+ doc: None,
+ size: 32,
+ attributes: {},
+ },
+ ),
+ ],
+ variant_index: {
+ Null: 0,
+ },
+ },
+ ),
+ )
+ "#]]
+ .assert_debug_eq(&s);
+
+ // union in union
+ let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
+ expect![[r#"
+ Err(
+ Unions may not directly contain a union,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+
+ // logical type
+ let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
+ expect![[r#"
+ Union(
+ UnionSchema {
+ schemas: [
+ Null,
+ Uuid,
+ ],
+ variant_index: {
+ Null: 0,
+ Uuid: 1,
+ },
+ },
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ // Note: Java Avro lib rejects this (logical type unions with its physical type)
+ let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
+ expect![[r#"
+ Union(
+ UnionSchema {
+ schemas: [
+ String,
+ Uuid,
+ ],
+ variant_index: {
+ String: 0,
+ Uuid: 1,
+ },
+ },
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ // Note: Java Avro lib rejects this (logical type unions with its physical type)
+ let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
+ expect![[r#"
+ Union(
+ UnionSchema {
+ schemas: [
+ Int,
+ Date,
+ ],
+ variant_index: {
+ Int: 0,
+ Date: 1,
+ },
+ },
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ // Note: Java Avro lib allows this (2 decimal with different "name")
+ let s = Schema::parse_str(
+ r#"[
+{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
+{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
+]"#,
+ );
+ expect![[r#"
+ Err(
+ Unions cannot contain duplicate types,
+ )
+ "#]]
+ .assert_debug_eq(&s);
+ }
+
+ #[test]
+ fn test_avro_lib_union_record_bug() {
+ // multiple named types (record)
+ let s = Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "Root",
+ "fields": [
+ {
+ "name": "unionTypeComplex",
+ "type": [
+ "null",
+ {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
+ {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
+ {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
+ ]
+ }
+ ]
+ }
+ "#,
+ )
+ .unwrap();
+
+ let bytes = hex::decode("060c").unwrap();
+ // Correct should be variant 3 (Sms)
+ let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None);
+ expect![[r#"
+ Ok(
+ Record(
+ [
+ (
+ "unionTypeComplex",
+ Union(
+ 3,
+ Record(
+ [
+ (
+ "inner",
+ Int(
+ 6,
+ ),
+ ),
+ ],
+ ),
+ ),
+ ),
+ ],
+ ),
+ )
+ "#]]
+ .assert_debug_eq(&correct_value);
+ // Bug: We got variant 2 (Fax) here, if we pass the reader schema.
+ let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s));
+ expect![[r#"
+ Ok(
+ Record(
+ [
+ (
+ "unionTypeComplex",
+ Union(
+ 2,
+ Record(
+ [
+ (
+ "inner",
+ Int(
+ 6,
+ ),
+ ),
+ ],
+ ),
+ ),
+ ),
+ ],
+ ),
+ )
+ "#]]
+ .assert_debug_eq(&wrong_value);
+
+ // The bug below can explain what happened.
+ // The two records below are actually incompatible: https://avro.apache.org/docs/1.11.1/specification/_print/#schema-resolution
+ // > both schemas are records with the _same (unqualified) name_
+ // In from_avro_datum, it first reads the value with the writer schema, and then
+ // it just uses the reader schema to interpret the value.
+ // The value doesn't have record "name" information. So it wrongly passed the conversion.
+ // The correct way is that we need to use both the writer and reader schema in the second step to interpret the value.
+
+ let s = Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "Root",
+ "fields": [
+ {
+ "name": "a",
+ "type": "int"
+ }
+ ]
+ }
+ "#,
+ )
+ .unwrap();
+ let s2 = Schema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "Root222",
+ "fields": [
+ {
+ "name": "a",
+ "type": "int"
+ }
+ ]
+}
+ "#,
+ )
+ .unwrap();
+
+ let bytes = hex::decode("0c").unwrap();
+ let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2));
+ expect![[r#"
+ Ok(
+ Record(
+ [
+ (
+ "a",
+ Int(
+ 6,
+ ),
+ ),
+ ],
+ ),
+ )
+ "#]]
+ .assert_debug_eq(&value);
+ }
+
#[test]
fn test_convert_decimal() {
// 280
diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs
index fe96495d089ea..324b7fd426a56 100644
--- a/src/connector/codec/src/decoder/avro/schema.rs
+++ b/src/connector/codec/src/decoder/avro/schema.rs
@@ -14,14 +14,18 @@
use std::sync::{Arc, LazyLock};
+use anyhow::Context;
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::AvroResult;
use itertools::Itertools;
-use risingwave_common::bail;
+use risingwave_common::error::NotImplemented;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
+use risingwave_common::{bail, bail_not_implemented};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};
+use super::get_nullable_union_inner;
+
/// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved.
///
/// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet).
@@ -198,20 +202,46 @@ fn avro_type_mapping(
DataType::List(Box::new(item_type))
}
Schema::Union(union_schema) => {
- // We only support using union to represent nullable fields, not general unions.
- let variants = union_schema.variants();
- if variants.len() != 2 || !variants.contains(&Schema::Null) {
- bail!(
- "unsupported Avro type, only unions like [null, T] is supported: {:?}",
- schema
- );
- }
- let nested_schema = variants
- .iter()
- .find_or_first(|s| !matches!(s, Schema::Null))
- .unwrap();
+ // Note: Unions may not immediately contain other unions. So a `null` must represent a top-level null.
+ // e.g., ["null", ["null", "string"]] is not allowed
+
+ // Note: Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum.
+ // https://avro.apache.org/docs/1.11.1/specification/_print/#unions
+ debug_assert!(
+ union_schema
+ .variants()
+ .iter()
+ .map(Schema::canonical_form) // Schema doesn't implement Eq, but only PartialEq.
+ .duplicates()
+ .next()
+ .is_none(),
+ "Union contains duplicate types: {union_schema:?}",
+ );
+ match get_nullable_union_inner(union_schema) {
+ Some(inner) => avro_type_mapping(inner, map_handling)?,
+ None => {
+ // Convert the union to a struct, each field of the struct represents a variant of the union.
+ // Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect.
+ // Note: Avro union's variant tag is type name, not field name (unlike Rust enum, or Protobuf oneof).
+
+ // XXX: do we need to introduce union.handling.mode?
+ let (fields, field_names) = union_schema
+ .variants()
+ .iter()
+ // null will mean the whole struct is null
+ .filter(|variant| !matches!(variant, &&Schema::Null))
+ .map(|variant| {
+ avro_type_mapping(variant, map_handling).and_then(|t| {
+ let name = avro_schema_to_struct_field_name(variant)?;
+ Ok((t, name))
+ })
+ })
+ .process_results(|it| it.unzip::<_, _, Vec<_>, Vec<_>>())
+ .context("failed to convert Avro union to struct")?;
- avro_type_mapping(nested_schema, map_handling)?
+ DataType::new_struct(fields, field_names)
+ }
+ }
}
Schema::Ref { name } => {
if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
@@ -219,7 +249,7 @@ fn avro_type_mapping(
{
DataType::Decimal
} else {
- bail!("unsupported Avro type: {:?}", schema);
+ bail_not_implemented!("Avro type: {:?}", schema);
}
}
Schema::Map(value_schema) => {
@@ -229,20 +259,25 @@ fn avro_type_mapping(
if supported_avro_to_json_type(value_schema) {
DataType::Jsonb
} else {
- bail!(
- "unsupported Avro type, cannot convert map to jsonb: {:?}",
+ bail_not_implemented!(
+ issue = 16963,
+ "Avro map type to jsonb: {:?}",
schema
- )
+ );
}
}
None => {
+ // We require it to be specified, because we don't want to have a bad default behavior.
+ // But perhaps changing the default behavior won't be a breaking change,
+ // because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted.
+ // This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime.
bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`")
}
}
}
Schema::Uuid => DataType::Varchar,
Schema::Null | Schema::Fixed(_) => {
- bail!("unsupported Avro type: {:?}", schema)
+ bail_not_implemented!("Avro type: {:?}", schema);
}
};
@@ -280,3 +315,71 @@ fn supported_avro_to_json_type(schema: &Schema) -> bool {
| Schema::Union(_) => false,
}
}
+
+/// The field name when converting Avro union type to RisingWave struct type.
+pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result {
+ Ok(match schema {
+ Schema::Null => unreachable!(),
+ Schema::Union(_) => unreachable!(),
+ // Primitive types
+ Schema::Boolean => "boolean".to_string(),
+ Schema::Int => "int".to_string(),
+ Schema::Long => "long".to_string(),
+ Schema::Float => "float".to_string(),
+ Schema::Double => "double".to_string(),
+ Schema::Bytes => "bytes".to_string(),
+ Schema::String => "string".to_string(),
+ // Unnamed Complex types
+ Schema::Array(_) => "array".to_string(),
+ Schema::Map(_) => "map".to_string(),
+ // Named Complex types
+ Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => {
+ // schema.name().unwrap().fullname(None)
+ // See test_avro_lib_union_record_bug
+ // https://github.com/risingwavelabs/risingwave/issues/17632
+ bail_not_implemented!(issue=17632, "Avro named type used in Union type: {:?}", schema)
+
+ }
+
+ // Logical types are currently banned. See https://github.com/risingwavelabs/risingwave/issues/17616
+
+/*
+ Schema::Uuid => "uuid".to_string(),
+ // Decimal is the most tricky. https://avro.apache.org/docs/1.11.1/specification/_print/#decimal
+ // - A decimal logical type annotates Avro bytes _or_ fixed types.
+ // - It has attributes `precision` and `scale`.
+ // "For the purposes of schema resolution, two schemas that are decimal logical types match if their scales and precisions match."
+ // - When the physical type is fixed, it's a named type. And a schema containing 2 decimals is possible:
+ // [
+ // {"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
+ // {"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
+ // ]
+ // In this case (a logical type's physical type is a named type), perhaps we should use the physical type's `name`.
+ Schema::Decimal(_) => "decimal".to_string(),
+ Schema::Date => "date".to_string(),
+ // Note: in Avro, the name style is "time-millis", etc.
+ // But in RisingWave (Postgres), it will require users to use quotes, i.e.,
+ // select (struct)."time-millis", (struct).time_millies from t;
+ // The latter might be more user-friendly.
+ Schema::TimeMillis => "time_millis".to_string(),
+ Schema::TimeMicros => "time_micros".to_string(),
+ Schema::TimestampMillis => "timestamp_millis".to_string(),
+ Schema::TimestampMicros => "timestamp_micros".to_string(),
+ Schema::LocalTimestampMillis => "local_timestamp_millis".to_string(),
+ Schema::LocalTimestampMicros => "local_timestamp_micros".to_string(),
+ Schema::Duration => "duration".to_string(),
+*/
+ Schema::Uuid
+ | Schema::Decimal(_)
+ | Schema::Date
+ | Schema::TimeMillis
+ | Schema::TimeMicros
+ | Schema::TimestampMillis
+ | Schema::TimestampMicros
+ | Schema::LocalTimestampMillis
+ | Schema::LocalTimestampMicros
+ | Schema::Duration => {
+ bail_not_implemented!(issue=17616, "Avro logicalType used in Union type: {:?}", schema)
+ }
+ })
+}
diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs
index c7e04ab210a6e..cd7fe14ab74ea 100644
--- a/src/connector/codec/src/decoder/mod.rs
+++ b/src/connector/codec/src/decoder/mod.rs
@@ -16,6 +16,7 @@ pub mod avro;
pub mod json;
pub mod utils;
+use risingwave_common::error::NotImplemented;
use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
use thiserror::Error;
use thiserror_ext::Macro;
@@ -40,23 +41,41 @@ pub enum AccessError {
/// Errors that are not categorized into variants above.
#[error("{message}")]
Uncategorized { message: String },
+
+ #[error(transparent)]
+ NotImplemented(#[from] NotImplemented),
}
pub type AccessResult = std::result::Result;
/// Access to a field in the data structure. Created by `AccessBuilder`.
+///
+/// It's the `ENCODE ...` part in `FORMAT ... ENCODE ...`
pub trait Access {
/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
+ ///
/// `type_expected` might or might not be used during the conversion depending on the implementation.
///
/// # Path
///
- /// We usually expect the data is a record (struct), and `path` represents field path.
+ /// We usually expect the data (`Access` instance) is a record (struct), and `path` represents field path.
/// The data (or part of the data) represents the whole row (`Vec`),
/// and we use different `path` to access one column at a time.
///
- /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
+ /// TODO: the meaning of `path` is a little confusing and maybe over-abstracted.
+ /// `access` does not need to serve arbitrarily deep `path` access, but just "top-level" access.
+ /// The API creates an illusion that arbitrary access is supported, but it's not.
+ /// Perhapts we should separate out another trait like `ToDatum`,
+ /// which only does type mapping, without caring about the path. And `path` itself is only an `enum` instead of `&[&str]`.
+ ///
+ /// What `path` to access is decided by the CDC layer, i.e., the `FORMAT ...` part (`ChangeEvent`).
+ /// e.g.,
+ /// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value,
+ /// `["source", "db"]`, `["source", "table"]` etc. for additional columns' values,
+ /// `["op"]` for op type.
+ /// - `MaxwellChangeEvent` accesses `["data", "col_name"]` for value, `["type"]` for op type.
+ /// - In the simplest case, for `FORMAT PLAIN/UPSERT` (`KvEvent`), they just access `["col_name"]` for value, and op type is derived.
///
/// # Returns
///
diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs
index fab143b2bf9e7..11421c151d7a5 100644
--- a/src/connector/codec/tests/integration_tests/avro.rs
+++ b/src/connector/codec/tests/integration_tests/avro.rs
@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use anyhow::Context;
use apache_avro::from_avro_datum;
use risingwave_connector_codec::decoder::avro::{
avro_schema_to_column_descs, AvroAccess, AvroParseOptions, MapHandling, ResolvedAvroSchema,
};
use risingwave_connector_codec::decoder::Access;
use risingwave_connector_codec::AvroSchema;
+use thiserror_ext::AsReport;
use crate::utils::*;
@@ -44,6 +46,24 @@ struct Config {
data_encoding: TestDataEncoding,
}
+fn avro_schema_str_to_risingwave_schema(
+ avro_schema: &str,
+ config: &Config,
+) -> anyhow::Result<(ResolvedAvroSchema, Vec)> {
+ // manually implement some logic in AvroParserConfig::map_to_columns
+ let avro_schema = AvroSchema::parse_str(avro_schema).context("failed to parse Avro schema")?;
+ let resolved_schema =
+ ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?;
+
+ let rw_schema =
+ avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
+ .context("failed to convert Avro schema to RisingWave schema")?
+ .iter()
+ .map(ColumnDesc::from)
+ .collect_vec();
+ Ok((resolved_schema, rw_schema))
+}
+
/// Data driven testing for converting Avro Schema to RisingWave Schema, and then converting Avro data into RisingWave data.
///
/// The expected results can be automatically updated. To run and update the tests:
@@ -79,17 +99,15 @@ fn check(
expected_risingwave_schema: expect_test::Expect,
expected_risingwave_data: expect_test::Expect,
) {
- // manually implement some logic in AvroParserConfig::map_to_columns
- let avro_schema = AvroSchema::parse_str(avro_schema).expect("failed to parse Avro schema");
- let resolved_schema =
- ResolvedAvroSchema::create(avro_schema.into()).expect("failed to resolve Avro schema");
-
- let rw_schema =
- avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
- .expect("failed to convert Avro schema to RisingWave schema")
- .iter()
- .map(ColumnDesc::from)
- .collect_vec();
+ let (resolved_schema, rw_schema) =
+ match avro_schema_str_to_risingwave_schema(avro_schema, &config) {
+ Ok(res) => res,
+ Err(e) => {
+ expected_risingwave_schema.assert_eq(&format!("{}", e.as_report()));
+ expected_risingwave_data.assert_eq("");
+ return;
+ }
+ };
expected_risingwave_schema.assert_eq(&format!(
"{:#?}",
rw_schema.iter().map(ColumnDescTestDisplay).collect_vec()
@@ -554,3 +572,316 @@ fn test_1() {
Owned(Float64(OrderedFloat(NaN)))"#]],
);
}
+
+#[test]
+fn test_union() {
+ // A basic test
+ check(
+ r#"
+{
+ "type": "record",
+ "name": "Root",
+ "fields": [
+ {
+ "name": "unionType",
+ "type": ["int", "string"]
+ },
+ {
+ "name": "unionTypeComplex",
+ "type": [
+ "null",
+ {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
+ {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
+ {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
+ ]
+ },
+ {
+ "name": "nullableString",
+ "type": ["null", "string"]
+ }
+ ]
+}
+ "#,
+ &[
+ // {
+ // "unionType": {"int": 114514},
+ // "unionTypeComplex": {"Sms": {"inner":6}},
+ // "nullableString": null
+ // }
+ "00a4fd0d060c00",
+ // {
+ // "unionType": {"int": 114514},
+ // "unionTypeComplex": {"Fax": {"inner":6}},
+ // "nullableString": null
+ // }
+ "00a4fd0d040c00",
+ // {
+ // "unionType": {"string": "oops"},
+ // "unionTypeComplex": null,
+ // "nullableString": {"string": "hello"}
+ // }
+ "02086f6f707300020a68656c6c6f",
+ // {
+ // "unionType": {"string": "oops"},
+ // "unionTypeComplex": {"Email": {"inner":"a@b.c"}},
+ // "nullableString": null
+ // }
+ "02086f6f7073020a6140622e6300",
+ ],
+ Config {
+ map_handling: None,
+ data_encoding: TestDataEncoding::HexBinary,
+ },
+ // FIXME: why the struct type doesn't have field_descs? https://github.com/risingwavelabs/risingwave/issues/17128
+ expect![[r#"
+ failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "Email", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "inner", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }], lookup: {"inner": 0}, attributes: {} })
+ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17632"#]],
+ expect![""],
+ );
+
+ // logicalType is currently rejected
+ // https://github.com/risingwavelabs/risingwave/issues/17616
+ check(
+ r#"
+{
+"type": "record",
+"name": "Root",
+"fields": [
+ {
+ "name": "unionLogical",
+ "type": ["int", {"type":"int", "logicalType": "date"}]
+ }
+]
+}
+ "#,
+ &[],
+ Config {
+ map_handling: None,
+ data_encoding: TestDataEncoding::HexBinary,
+ },
+ expect![[r#"
+ failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro logicalType used in Union type: Date
+ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17616"#]],
+ expect![""],
+ );
+
+ // test named type. Consider namespace.
+ // https://avro.apache.org/docs/1.11.1/specification/_print/#names
+ // List of things to take care:
+ // - Record fields and enum symbols DO NOT have namespace.
+ // - If the name specified contains a dot, then it is assumed to be a fullname, and any namespace also specified is IGNORED.
+ // - If a name doesn't have its own namespace, it will look for its most tightly enclosing named schema.
+ check(
+ r#"
+{
+ "type": "record",
+ "name": "Root",
+ "namespace": "RootNamespace",
+ "fields": [
+ {
+ "name": "littleFieldToMakeNestingLooksBetter",
+ "type": ["null","int"], "default": null
+ },
+ {
+ "name": "recordField",
+ "type": ["null", "int", {
+ "type": "record",
+ "name": "my.name.spaced.record",
+ "namespace": "when.name.contains.dot.namespace.is.ignored",
+ "fields": [
+ {"name": "hello", "type": {"type": "int", "default": 1}},
+ {"name": "world", "type": {"type": "double", "default": 1}}
+ ]
+ }],
+ "default": null
+ },
+ {
+ "name": "enumField",
+ "type": ["null", "int", {
+ "type": "enum",
+ "name": "myEnum",
+ "namespace": "my.namespace",
+ "symbols": ["A", "B", "C", "D"]
+ }],
+ "default": null
+ },
+ {
+ "name": "anotherEnumFieldUsingRootNamespace",
+ "type": ["null", "int", {
+ "type": "enum",
+ "name": "myEnum",
+ "symbols": ["A", "B", "C", "D"]
+ }],
+ "default": null
+ }
+ ]
+}
+"#,
+ &[
+ // {
+ // "enumField":{"my.namespace.myEnum":"A"},
+ // "anotherEnumFieldUsingRootNamespace":{"RootNamespace.myEnum": "D"}
+ // }
+ "000004000406",
+ ],
+ Config {
+ map_handling: None,
+ data_encoding: TestDataEncoding::HexBinary,
+ },
+ expect![[r#"
+ failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "record", namespace: Some("my.name.spaced") }, aliases: None, doc: None, fields: [RecordField { name: "hello", doc: None, aliases: None, default: None, schema: Int, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "world", doc: None, aliases: None, default: None, schema: Double, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"hello": 0, "world": 1}, attributes: {} })
+ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17632"#]],
+ expect![""],
+ );
+
+ // This is provided by a user https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2051480710
+ check(
+ r#"
+{
+ "namespace": "com.abc.efg.mqtt",
+ "name": "also.DataMessage",
+ "type": "record",
+ "fields": [
+ {
+ "name": "metrics",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "also_data_metric",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "norm_name",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "uom",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "data",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "dataItem",
+ "type": "record",
+ "fields": [
+ {
+ "name": "ts",
+ "type": "string",
+ "doc": "Timestamp of the metric."
+ },
+ {
+ "name": "value",
+ "type": [
+ "null",
+ "boolean",
+ "double",
+ "string"
+ ],
+ "doc": "Value of the metric."
+ }
+ ]
+ }
+ },
+ "doc": "The data message"
+ }
+ ],
+ "doc": "A metric object"
+ }
+ },
+ "doc": "A list of metrics."
+ }
+ ]
+}
+ "#,
+ &[
+ // {
+ // "metrics": [
+ // {"id":"foo", "name":"a", "data": [] }
+ // ]
+ // }
+ "0206666f6f026100000000",
+ // {
+ // "metrics": [
+ // {"id":"foo", "name":"a", "norm_name": null, "uom": {"string":"c"}, "data": [{"ts":"1", "value":null}, {"ts":"2", "value": {"boolean": true }}] }
+ // ]
+ // }
+ "0206666f6f02610002026304023100023202010000",
+ ],
+ Config {
+ map_handling: None,
+ data_encoding: TestDataEncoding::HexBinary,
+ },
+ expect![[r#"
+ [
+ metrics(#1): List(
+ Struct {
+ id: Varchar,
+ name: Varchar,
+ norm_name: Varchar,
+ uom: Varchar,
+ data: List(
+ Struct {
+ ts: Varchar,
+ value: Struct {
+ boolean: Boolean,
+ double: Float64,
+ string: Varchar,
+ },
+ },
+ ),
+ },
+ ),
+ ]"#]],
+ expect![[r#"
+ Owned([
+ StructValue(
+ Utf8("foo"),
+ Utf8("a"),
+ null,
+ null,
+ [],
+ ),
+ ])
+ ----
+ Owned([
+ StructValue(
+ Utf8("foo"),
+ Utf8("a"),
+ null,
+ Utf8("c"),
+ [
+ StructValue(
+ Utf8("1"),
+ null,
+ ),
+ StructValue(
+ Utf8("2"),
+ StructValue(
+ Bool(true),
+ null,
+ null,
+ ),
+ ),
+ ],
+ ),
+ ])"#]],
+ );
+}
diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs
index 1a40b87c9d498..04f80ebba1ca1 100644
--- a/src/connector/src/parser/debezium/avro_parser.rs
+++ b/src/connector/src/parser/debezium/avro_parser.rs
@@ -19,8 +19,8 @@ use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Schema};
use risingwave_common::try_match_expand;
use risingwave_connector_codec::decoder::avro::{
- avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess,
- AvroParseOptions, ResolvedAvroSchema,
+ avro_extract_field_schema, avro_schema_skip_nullable_union, avro_schema_to_column_descs,
+ AvroAccess, AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;
@@ -125,8 +125,40 @@ impl DebeziumAvroParserConfig {
}
pub fn map_to_columns(&self) -> ConnectorResult> {
+ // Refer to debezium_avro_msg_schema.avsc for how the schema looks like:
+
+ // "fields": [
+ // {
+ // "name": "before",
+ // "type": [
+ // "null",
+ // {
+ // "type": "record",
+ // "name": "Value",
+ // "fields": [...],
+ // }
+ // ],
+ // "default": null
+ // },
+ // {
+ // "name": "after",
+ // "type": [
+ // "null",
+ // "Value"
+ // ],
+ // "default": null
+ // },
+ // ...]
+
+ // Other fields are:
+ // - source: describes the source metadata for the event
+ // - op
+ // - ts_ms
+ // - transaction
+ // See
+
avro_schema_to_column_descs(
- avro_schema_skip_union(avro_extract_field_schema(
+ avro_schema_skip_nullable_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
@@ -227,7 +259,7 @@ mod tests {
let outer_schema = get_outer_schema();
let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap();
- let extracted_inner_schema = avro_schema_skip_union(
+ let extracted_inner_schema = avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap();
@@ -318,7 +350,7 @@ mod tests {
fn test_map_to_columns() {
let outer_schema = get_outer_schema();
let columns = avro_schema_to_column_descs(
- avro_schema_skip_union(
+ avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap(),
diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs
index e4a229bb61b98..ca709e2eebc73 100644
--- a/src/connector/src/parser/unified/json.rs
+++ b/src/connector/src/parser/unified/json.rs
@@ -534,7 +534,7 @@ impl JsonParseOptions {
(DataType::Struct(struct_type_info), ValueType::Object) => {
// Collecting into a Result> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead.
// https://github.com/rust-lang/rust/issues/48994
- let mut fields = Vec::with_capacity(struct_type_info.types().len());
+ let mut fields = Vec::with_capacity(struct_type_info.len());
for (field_name, field_type) in struct_type_info
.names()
.zip_eq_fast(struct_type_info.types())
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index 872e7bbfeaf7c..a1a993803568b 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -38,7 +38,6 @@ pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod snowflake;
-pub mod snowflake_connector;
pub mod sqlserver;
pub mod starrocks;
pub mod test_sink;
diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs
index 0f512933cd0f7..1c9d67352247c 100644
--- a/src/connector/src/sink/snowflake.rs
+++ b/src/connector/src/sink/snowflake.rs
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::collections::{BTreeMap, HashMap};
+use std::collections::BTreeMap;
use std::fmt::Write;
use std::sync::Arc;
@@ -22,7 +22,11 @@ use bytes::{Bytes, BytesMut};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
-use risingwave_object_store::object::{ObjectStore, OpendalStreamingUploader, StreamingUploader};
+use risingwave_common::config::ObjectStoreConfig;
+use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
+use risingwave_object_store::object::{
+ ObjectStore, OpendalObjectStore, OpendalStreamingUploader, StreamingUploader,
+};
use serde::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
@@ -32,70 +36,43 @@ use with_options::WithOptions;
use super::encoder::{
JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
-use super::snowflake_connector::{generate_s3_file_name, SnowflakeHttpClient, SnowflakeS3Client};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam};
use crate::sink::writer::SinkWriterExt;
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};
pub const SNOWFLAKE_SINK: &str = "snowflake";
+const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE";
-#[derive(Deserialize, Debug, Clone, WithOptions)]
+#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct SnowflakeCommon {
- /// The snowflake database used for sinking
- #[serde(rename = "snowflake.database")]
- pub database: String,
-
- /// The corresponding schema where sink table exists
- #[serde(rename = "snowflake.schema")]
- pub schema: String,
-
- /// The created pipe object, will be used as `insertFiles` target
- #[serde(rename = "snowflake.pipe")]
- pub pipe: String,
-
- /// The unique, snowflake provided `account_identifier`
- /// NOTE: please use the form `-`
- /// For detailed guidance, reference:
- #[serde(rename = "snowflake.account_identifier")]
- pub account_identifier: String,
-
- /// The user that owns the table to be sinked
- /// NOTE: the user should've been granted corresponding *role*
- /// reference:
- #[serde(rename = "snowflake.user")]
- pub user: String,
-
- /// The public key fingerprint used when generating custom `jwt_token`
- /// reference:
- #[serde(rename = "snowflake.rsa_public_key_fp")]
- pub rsa_public_key_fp: String,
-
- /// The rsa pem key *without* encryption
- #[serde(rename = "snowflake.private_key")]
- pub private_key: String,
-
/// The s3 bucket where intermediate sink files will be stored
- #[serde(rename = "snowflake.s3_bucket")]
+ #[serde(rename = "snowflake.s3_bucket", alias = "s3.bucket_name")]
pub s3_bucket: String,
/// The optional s3 path to be specified
/// the actual file location would be `s3:////`
/// if this field is specified by user(s)
/// otherwise it would be `s3:///`
- #[serde(rename = "snowflake.s3_path")]
+ #[serde(rename = "snowflake.s3_path", alias = "s3.path")]
pub s3_path: Option,
/// s3 credentials
- #[serde(rename = "snowflake.aws_access_key_id")]
+ #[serde(
+ rename = "snowflake.aws_access_key_id",
+ alias = "s3.credentials.access"
+ )]
pub aws_access_key_id: String,
/// s3 credentials
- #[serde(rename = "snowflake.aws_secret_access_key")]
+ #[serde(
+ rename = "snowflake.aws_secret_access_key",
+ alias = "s3.credentials.secret"
+ )]
pub aws_secret_access_key: String,
/// The s3 region, e.g., us-east-2
- #[serde(rename = "snowflake.aws_region")]
+ #[serde(rename = "snowflake.aws_region", alias = "s3.region_name")]
pub aws_region: String,
}
@@ -173,8 +150,6 @@ pub struct SnowflakeSinkWriter {
pk_indices: Vec,
#[expect(dead_code)]
is_append_only: bool,
- /// the client used to send `insertFiles` post request
- http_client: SnowflakeHttpClient,
/// the client to insert file to external storage (i.e., s3)
s3_client: SnowflakeS3Client,
row_encoder: JsonEncoder,
@@ -187,7 +162,7 @@ pub struct SnowflakeSinkWriter {
/// note: the option here *implicitly* indicates whether we have at
/// least call `streaming_upload` once during this epoch,
/// which is mainly used to prevent uploading empty data.
- streaming_uploader: Option<(OpendalStreamingUploader, String)>,
+ streaming_uploader: Option,
}
impl SnowflakeSinkWriter {
@@ -197,18 +172,6 @@ impl SnowflakeSinkWriter {
pk_indices: Vec,
is_append_only: bool,
) -> Result {
- let http_client = SnowflakeHttpClient::new(
- config.common.account_identifier.clone(),
- config.common.user.clone(),
- config.common.database.clone(),
- config.common.schema.clone(),
- config.common.pipe.clone(),
- config.common.rsa_public_key_fp.clone(),
- config.common.private_key.clone(),
- HashMap::new(),
- config.common.s3_path.clone(),
- );
-
let s3_client = SnowflakeS3Client::new(
config.common.s3_bucket.clone(),
config.common.s3_path.clone(),
@@ -222,7 +185,6 @@ impl SnowflakeSinkWriter {
schema: schema.clone(),
pk_indices,
is_append_only,
- http_client,
s3_client,
row_encoder: JsonEncoder::new(
schema,
@@ -245,7 +207,7 @@ impl SnowflakeSinkWriter {
/// and `streaming_upload` being called the first time.
/// i.e., lazily initialization of the internal `streaming_uploader`.
/// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy.
- async fn new_streaming_uploader(&mut self) -> Result<(OpendalStreamingUploader, String)> {
+ async fn new_streaming_uploader(&mut self) -> Result {
let file_suffix = self.file_suffix();
let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix);
let uploader = self
@@ -260,12 +222,12 @@ impl SnowflakeSinkWriter {
)
})
.map_err(SinkError::Snowflake)?;
- Ok((uploader, file_suffix))
+ Ok(uploader)
}
/// write data to the current streaming uploader for this epoch.
async fn streaming_upload(&mut self, data: Bytes) -> Result<()> {
- let (uploader, _) = match self.streaming_uploader.as_mut() {
+ let uploader = match self.streaming_uploader.as_mut() {
Some(s) => s,
None => {
assert!(
@@ -286,18 +248,18 @@ impl SnowflakeSinkWriter {
/// finalize streaming upload for this epoch.
/// ensure all the data has been properly uploaded to intermediate s3.
- async fn finish_streaming_upload(&mut self) -> Result