diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ab8ba3d9d7eb9..cb54c1606356e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,10 @@ repos: hooks: - id: end-of-file-fixer - id: trailing-whitespace +- repo: https://github.com/crate-ci/typos + rev: v1.23.1 + hooks: + - id: typos - repo: local hooks: - id: rustfmt @@ -14,10 +18,6 @@ repos: entry: rustfmt --edition 2021 language: system types: [rust] - - id: typos - name: typos - entry: typos -w - language: system - id: cargo sort name: cargo sort entry: cargo sort -g -w diff --git a/docs/README.md b/docs/README.md index e905cea7849ea..f371d9bda8f47 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,25 +2,5 @@ This directory contains RisingWave design documents that are intended to be used by contributors to understand our development process, and how we design and implement RisingWave. To learn about how to use RisingWave, check out the [RisingWave user documentation](https://www.risingwave.dev). -## Developer guide - -After you learn about the basics of RisingWave, take a look at our [developer guide](https://risingwavelabs.github.io/risingwave/) to get up to speed with the development process. - -## Table of Contents - -* [Architecture Design](./architecture-design.md) -* [An Overview of RisingWave Streaming Engine](./streaming-overview.md) -* [An Overview of RisingWave State Store](./state-store-overview.md) -* [Meta Service](./meta-service.md) -* [Create MView on Top of MView](./mv-on-mv.md) -* [Checkpoint](./checkpoint.md) -* [Design of Data Source](./data-source.md) -* [Data Model and Encoding](./data-model-and-encoding.md) -* [Design of Batch Local Execution Mode](./batch-local-execution-mode.md) -* [Consistent Hash](./consistent-hash.md) -* [Build RisingWave with Multiple Object Storage Backends](./multi-object-store.md) -* [Backfill](./backfill.md) - -## Images - -We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing. +- `/dev` contains the source code for the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/) +- `/rustdoc` contains source code for the [crate level documentation](https://risingwavelabs.github.io/risingwave/rustdoc) diff --git a/docs/dev/README.md b/docs/dev/README.md index e19f10c08e3c3..7e47920d49400 100644 --- a/docs/dev/README.md +++ b/docs/dev/README.md @@ -28,3 +28,7 @@ including the `` marker at the place where you want the TOC. We use `mdbook-linkcheck` to validate URLs included in our documentation. `linkcheck` will be run automatically when you build with the instructions in the section above. + +## Images + +We recommend that you use [draw.io](https://app.diagrams.net/) to draw illustrations and export as SVG images, with "include a copy of my diagram" selected for further editing. diff --git a/docs/dev/src/SUMMARY.md b/docs/dev/src/SUMMARY.md index 76cf57c007c23..382cc0f80fec8 100644 --- a/docs/dev/src/SUMMARY.md +++ b/docs/dev/src/SUMMARY.md @@ -11,10 +11,11 @@ - [Testing](./tests/intro.md) - [Debugging](./debugging.md) - [Observability](./observability.md) + - [Metrics](./metrics.md) --- -# Benchmarking and Profiling +# Benchmarking and profiling - [CPU Profiling](./benchmark-and-profile/cpu-profiling.md) - [Memory (Heap) Profiling](./benchmark-and-profile/memory-profiling.md) @@ -27,6 +28,28 @@ - [Develop Connectors](./connector/intro.md) - [Continuous Integration](./ci.md) +--- + +# Design docs + + + +- [Architecture Design](./design/architecture-design.md) +- [An Overview of RisingWave Streaming Engine](./design/streaming-overview.md) +- [An Overview of RisingWave State Store](./design/state-store-overview.md) +- [Meta Service](./design/meta-service.md) +- [Create MView on Top of MView](./design/mv-on-mv.md) +- [Checkpoint](./design/checkpoint.md) +- [Design of Data Source](./design/data-source.md) +- [Data Model and Encoding](./design/data-model-and-encoding.md) +- [Design of Batch Local Execution Mode](./design/batch-local-execution-mode.md) +- [Consistent Hash](./design/consistent-hash.md) +- [Build RisingWave with Multiple Object Storage Backends](./design/multi-object-store.md) +- [Backfill](./design/backfill.md) +- [Aggregation](./design/aggregation.md) +- [Shared Buffer](./design/shared-buffer.md) +- [Relational Table](./design/relational-table.md) +- [Keys](./design/keys.md) + ## Row-based Encoding @@ -23,8 +15,6 @@ We implement a relational table layer as the bridge between executors and KV sta | join | table_id \| join_key \| pk | materialized value | | agg | table_id \| group_key | agg_value | -For the detailed schema, please check [doc](relational-table-schema.md) - ## Relational Table Layer [source code](https://github.com/risingwavelabs/risingwave/blob/4e66ca3d41435c64af26b5e0003258c4f7116822/src/storage/src/table/state_table.rs) @@ -36,13 +26,13 @@ Relational table layer consists of State Table, Mem Table and Storage Table. The State Table provides the table operations by these APIs: `get_row`, `scan`, `insert_row`, `delete_row` and `update_row`, which are the read and write interfaces for streaming executors. The Mem Table is an in-memory buffer for caching table operations during one epoch. The Storage Table is read only, and will output the partial columns upper level needs. -![Overview of Relational Table](../images/relational-table-layer/relational-table-01.svg) +![Overview of Relational Table](../images/relational-table/relational-table-01.svg) ### Write Path To write into KV state store, executors first perform operations on State Table, and these operations will be cached in Mem Table. Once a barrier flows through one executor, executor will flush the cached operations into state store. At this moment, State Table will covert these operations into kv pairs and write to state store with specific epoch. For example, an executor performs `insert(a, b, c)` and `delete(d, e, f)` through the State Table APIs, Mem Table first caches these two operations in memory. After receiving new barrier, State Table converts these two operations into KV operations by row-based format, and writes these KV operations into state store (Hummock). -![write example](../images/relational-table-layer/relational-table-03.svg) +![write example](../images/relational-table/relational-table-03.svg) ### Read Path In streaming mode, executors should be able to read the latest written data, which means uncommitted data is visible. The data in Mem Table (memory) is fresher than that in shared storage (state store). State Table provides both point-get and scan to read from state store by merging data from Mem Table and Storage Table. #### Get @@ -68,4 +58,36 @@ Get(pk = 3): [3, 3333, 3333] #### Scan Scan on relational table is implemented by `StateTableIter`, which is a merge iterator of `MemTableIter` and `StorageIter`. If a pk exists in both KV state store (shared storage) and memory (MemTable), result of `MemTableIter` is returned. For example, in the following figure, `StateTableIter` will generate `1->4->5->6` in order. -![Scan example](../images/relational-table-layer/relational-table-02.svg) +![Scan example](../images/relational-table/relational-table-02.svg) + + +## Example: HashAgg + +In this doc, we will take HashAgg with extreme state (`max`, `min`) or value state (`sum`, `count`) for example, and introduce a more detailed design for the internal table schema. + +[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144) + +### Table id +`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls. + +### Value State (Sum, Count) +Query example: +```sql +select sum(v2), count(v3) from t group by v1 +``` + +This query will need to initiate 2 Relational Tables. The schema is `table_id/group_key`. + +### Extreme State (Max, Min) +Query example: +```sql +select max(v2), min(v3) from t group by v1 +``` + +This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes `table_id/group_key/sort_key/upstream_pk`. + +The order of `sort_key` depends on the agg call kind. For example, if it's `max()`, `sort_key` will order with `Ascending`. if it's `min()`, `sort_key` will order with `Descending`. +The `upstream_pk` is also appended to ensure the uniqueness of the key. +This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be `update` or `delete` operations in the stream, it's impossible to always guarantee correct results without storing all data. + +If `t` is created with append-only flag, the schema becomes `table_id/group_key`, which is the same for Value State. This is because in the append-only mode, there is no `update` or `delete` operation, so the cache will never miss. Therefore, we only need to write one value to the storage. diff --git a/docs/shared-buffer.md b/docs/dev/src/design/shared-buffer.md similarity index 99% rename from docs/shared-buffer.md rename to docs/dev/src/design/shared-buffer.md index 2b63a040b4c9a..7c7dac8f06e2d 100644 --- a/docs/shared-buffer.md +++ b/docs/dev/src/design/shared-buffer.md @@ -137,4 +137,4 @@ For all data a, b of the same type, we must ensure that: ``` in-memory representation of a < in-memory representation of b, iff memcomparable(a) < memcomparable(b) -``` \ No newline at end of file +``` diff --git a/docs/state-store-overview.md b/docs/dev/src/design/state-store-overview.md similarity index 96% rename from docs/state-store-overview.md rename to docs/dev/src/design/state-store-overview.md index 0fc64516ac52f..be8f3491550fc 100644 --- a/docs/state-store-overview.md +++ b/docs/dev/src/design/state-store-overview.md @@ -22,7 +22,7 @@ In RisingWave, all streaming executors store their data into a state store. This Reading this document requires prior knowledge of LSM-Tree-based KV storage engines, like RocksDB, LevelDB, etc. -![Overview of Architecture](images/state-store-overview/state-store-overview-01.svg) +![Overview of Architecture](../images/state-store-overview/state-store-overview-01.svg) Hummock consists of a manager service on the meta node, clients on worker nodes (including compute nodes, frontend nodes, and compactor nodes), and a shared storage to store files (SSTs). Every time a new write batch is produced, the Hummock client will upload those files to shared storage, and notify the Hummock manager of the new data. With compaction going on, new files will be added and unused files will be vacuumed. The Hummock manager will take care of the lifecycle of a file — is a file being used? can we delete a file? etc. @@ -104,7 +104,7 @@ The Hummock client will batch writes and generate SSTs to sync to the underlying After the SST is uploaded to an S3-compatible service, the Hummock client will let the Hummock manager know there's a new table. The list of all SSTs along with some metadata forms a ***version***. When the Hummock client adds new SSTs to the Hummock manager, a new version will be generated with the new set of SST files. -![Write Path](images/state-store-overview/state-store-overview-02.svg) +![Write Path](../images/state-store-overview/state-store-overview-02.svg) ### Read Path @@ -114,7 +114,7 @@ For every read operation (`scan`, `get`), we will first select SSTs that might c For `scan`, we simply select by overlapping key range. For point get, we will filter SSTs further by Bloom filter. After that, we will compose a single `MergeIterator` over all SSTs. The `MergeIterator` will return all keys in range along with their epochs. Then, we will create `UserIterator` over `MergeIterator`, and for all user keys, the user iterator will pick the first full key whose epoch <= read epoch. Therefore, users can perform a snapshot read from Hummock based on the given epoch. The snapshot should be acquired beforehand and released afterwards. -![Read Path](images/state-store-overview/state-store-overview-03.svg) +![Read Path](../images/state-store-overview/state-store-overview-03.svg) Hummock implements the following iterators: - `BlockIterator`: iterates a block of an SSTable. @@ -148,7 +148,7 @@ As mentioned in [Read Path](#read-path), reads are performed on a ***version*** The SQL frontend will get the latest epoch from the meta service. Then, it will embed the epoch number into SQL plans, so that all compute nodes will read from that epoch. In theory, both SQL frontend and compute nodes will ***pin the snapshot***, to handle the case that frontend goes down and the compute nodes are still reading from Hummock (#622). However, to simplify the process, currently we ***only pin on the frontend side***. -![Hummock Service](images/state-store-overview/state-store-overview-04.svg) +![Hummock Service](../images/state-store-overview/state-store-overview-04.svg) Hummock only guarantees that writes on one node can be immediately read from the same node. However, the worker nodes running batch queries might have a slightly outdated version when a batch query plan is received (due to the local version caching). Therefore, we have a `wait_epoch` interface to wait until the local cached version contains full data of one epoch. @@ -164,7 +164,7 @@ From the perspective of the streaming executors, when they receive a barrier, th Here we have two cases: Agg executors always persist and produce new write batches when receiving a barrier; Join executors (in the future when async flush gets implemented) will produce write batches within an epoch. -![Checkpoint in Streaming](images/state-store-overview/state-store-overview-05.svg) +![Checkpoint in Streaming](../images/state-store-overview/state-store-overview-05.svg) Streaming executors cannot control when data will be persisted — they can only write to Hummock's `shared buffer`. When a barrier flows across the system and is collected by the meta service, we can ensure that all executors have written their states of ***the previous epoch*** to the shared buffer, so we can initiate checkpoint process on all worker nodes, and upload SSTs to persistent remote storage. diff --git a/docs/streaming-overview.md b/docs/dev/src/design/streaming-overview.md similarity index 97% rename from docs/streaming-overview.md rename to docs/dev/src/design/streaming-overview.md index 2379fe2db13d3..b24eeaba51cb9 100644 --- a/docs/streaming-overview.md +++ b/docs/dev/src/design/streaming-overview.md @@ -26,7 +26,7 @@ In this document we give an overview of the RisingWave streaming engine. ## Architecture -![streaming-architecture](./images/streaming-overview/streaming-architecture.svg) +![streaming-architecture](../images/streaming-overview/streaming-architecture.svg) The overall architecture of RisingWave is depicted in the figure above. In brief, RisingWave streaming engine consists of three sets of nodes: frontend, compute nodes, and meta service. The frontend node consists of the serving layer, handling users' SQL requests concurrently. Underlying is the processing layer. Each compute node hosts a collection of long-running actors for stream processing. All actors access a shared persistence layer of storage (currently AWS S3) as its state storage. The meta service maintains all meta-information and coordinates the whole cluster. @@ -38,7 +38,7 @@ When receiving a create materialized view statement at the frontend, a materiali 4. Initializing the job at the backend. The meta service notifies all compute nodes to start serving streaming pipelines. ## Actors, executors, and states -![streaming-executor](./images/streaming-overview/streaming-executor-and-compute-node.svg) +![streaming-executor](../images/streaming-overview/streaming-executor-and-compute-node.svg) ### Actors @@ -75,4 +75,3 @@ See more detailed descriptions on [Checkpoint](./checkpoint.md). ### Fault tolerance When the streaming engine crashes down, the system must globally rollback to a previous consistent snapshot. To achieve this, whenever the meta detects the failover of some certain compute node or any undergoing checkpoint procedure, it triggers a recovery process. After rebuilding the streaming pipeline, each executor will reset its local state from a consistent snapshot on the storage and recover its computation. - diff --git a/docs/images/aggregation/agg-components.png b/docs/dev/src/images/aggregation/agg-components.png similarity index 100% rename from docs/images/aggregation/agg-components.png rename to docs/dev/src/images/aggregation/agg-components.png diff --git a/docs/images/aggregation/init-agg-group.png b/docs/dev/src/images/aggregation/init-agg-group.png similarity index 100% rename from docs/images/aggregation/init-agg-group.png rename to docs/dev/src/images/aggregation/init-agg-group.png diff --git a/docs/images/architecture-design/architecture.svg b/docs/dev/src/images/architecture-design/architecture.svg similarity index 100% rename from docs/images/architecture-design/architecture.svg rename to docs/dev/src/images/architecture-design/architecture.svg diff --git a/docs/images/architecture-design/batch-query.svg b/docs/dev/src/images/architecture-design/batch-query.svg similarity index 100% rename from docs/images/architecture-design/batch-query.svg rename to docs/dev/src/images/architecture-design/batch-query.svg diff --git a/docs/images/architecture-design/plan-fragments.svg b/docs/dev/src/images/architecture-design/plan-fragments.svg similarity index 100% rename from docs/images/architecture-design/plan-fragments.svg rename to docs/dev/src/images/architecture-design/plan-fragments.svg diff --git a/docs/images/architecture-design/stream-pipeline.png b/docs/dev/src/images/architecture-design/stream-pipeline.png similarity index 100% rename from docs/images/architecture-design/stream-pipeline.png rename to docs/dev/src/images/architecture-design/stream-pipeline.png diff --git a/docs/images/backfill/backfill-sides.png b/docs/dev/src/images/backfill/backfill-sides.png similarity index 100% rename from docs/images/backfill/backfill-sides.png rename to docs/dev/src/images/backfill/backfill-sides.png diff --git a/docs/images/backfill/handle-poll.png b/docs/dev/src/images/backfill/handle-poll.png similarity index 100% rename from docs/images/backfill/handle-poll.png rename to docs/dev/src/images/backfill/handle-poll.png diff --git a/docs/images/backfill/polling.png b/docs/dev/src/images/backfill/polling.png similarity index 100% rename from docs/images/backfill/polling.png rename to docs/dev/src/images/backfill/polling.png diff --git a/docs/images/backfill/replication-example.png b/docs/dev/src/images/backfill/replication-example.png similarity index 100% rename from docs/images/backfill/replication-example.png rename to docs/dev/src/images/backfill/replication-example.png diff --git a/docs/images/backfill/replication-replicated.png b/docs/dev/src/images/backfill/replication-replicated.png similarity index 100% rename from docs/images/backfill/replication-replicated.png rename to docs/dev/src/images/backfill/replication-replicated.png diff --git a/docs/images/backfill/replication-simple.png b/docs/dev/src/images/backfill/replication-simple.png similarity index 100% rename from docs/images/backfill/replication-simple.png rename to docs/dev/src/images/backfill/replication-simple.png diff --git a/docs/images/backfill/schema.png b/docs/dev/src/images/backfill/schema.png similarity index 100% rename from docs/images/backfill/schema.png rename to docs/dev/src/images/backfill/schema.png diff --git a/docs/images/batch-local-execution-mode/example1.svg b/docs/dev/src/images/batch-local-execution-mode/example1.svg similarity index 100% rename from docs/images/batch-local-execution-mode/example1.svg rename to docs/dev/src/images/batch-local-execution-mode/example1.svg diff --git a/docs/images/batch-local-execution-mode/example2.svg b/docs/dev/src/images/batch-local-execution-mode/example2.svg similarity index 100% rename from docs/images/batch-local-execution-mode/example2.svg rename to docs/dev/src/images/batch-local-execution-mode/example2.svg diff --git a/docs/images/batch-local-execution-mode/frontend-flow.svg b/docs/dev/src/images/batch-local-execution-mode/frontend-flow.svg similarity index 100% rename from docs/images/batch-local-execution-mode/frontend-flow.svg rename to docs/dev/src/images/batch-local-execution-mode/frontend-flow.svg diff --git a/docs/images/checkpoint/checkpoint.svg b/docs/dev/src/images/checkpoint/checkpoint.svg similarity index 100% rename from docs/images/checkpoint/checkpoint.svg rename to docs/dev/src/images/checkpoint/checkpoint.svg diff --git a/docs/images/checkpoint/shared-buffer.svg b/docs/dev/src/images/checkpoint/shared-buffer.svg similarity index 100% rename from docs/images/checkpoint/shared-buffer.svg rename to docs/dev/src/images/checkpoint/shared-buffer.svg diff --git a/docs/images/consistent-hash/actor-data.svg b/docs/dev/src/images/consistent-hash/actor-data.svg similarity index 100% rename from docs/images/consistent-hash/actor-data.svg rename to docs/dev/src/images/consistent-hash/actor-data.svg diff --git a/docs/images/consistent-hash/actor-state-table.svg b/docs/dev/src/images/consistent-hash/actor-state-table.svg similarity index 100% rename from docs/images/consistent-hash/actor-state-table.svg rename to docs/dev/src/images/consistent-hash/actor-state-table.svg diff --git a/docs/images/consistent-hash/data-distribution.svg b/docs/dev/src/images/consistent-hash/data-distribution.svg similarity index 100% rename from docs/images/consistent-hash/data-distribution.svg rename to docs/dev/src/images/consistent-hash/data-distribution.svg diff --git a/docs/images/consistent-hash/data-redistribution-1.svg b/docs/dev/src/images/consistent-hash/data-redistribution-1.svg similarity index 100% rename from docs/images/consistent-hash/data-redistribution-1.svg rename to docs/dev/src/images/consistent-hash/data-redistribution-1.svg diff --git a/docs/images/consistent-hash/data-redistribution-2.svg b/docs/dev/src/images/consistent-hash/data-redistribution-2.svg similarity index 100% rename from docs/images/consistent-hash/data-redistribution-2.svg rename to docs/dev/src/images/consistent-hash/data-redistribution-2.svg diff --git a/docs/images/consistent-hash/storage-data-layout.svg b/docs/dev/src/images/consistent-hash/storage-data-layout.svg similarity index 100% rename from docs/images/consistent-hash/storage-data-layout.svg rename to docs/dev/src/images/consistent-hash/storage-data-layout.svg diff --git a/docs/images/data-model-and-encoding/chunk.svg b/docs/dev/src/images/data-model-and-encoding/chunk.svg similarity index 100% rename from docs/images/data-model-and-encoding/chunk.svg rename to docs/dev/src/images/data-model-and-encoding/chunk.svg diff --git a/docs/images/data-model-and-encoding/row-format.svg b/docs/dev/src/images/data-model-and-encoding/row-format.svg similarity index 100% rename from docs/images/data-model-and-encoding/row-format.svg rename to docs/dev/src/images/data-model-and-encoding/row-format.svg diff --git a/docs/images/data-source/data-source-arch.svg b/docs/dev/src/images/data-source/data-source-arch.svg similarity index 100% rename from docs/images/data-source/data-source-arch.svg rename to docs/dev/src/images/data-source/data-source-arch.svg diff --git a/docs/images/logo-title.svg b/docs/dev/src/images/logo-title.svg similarity index 100% rename from docs/images/logo-title.svg rename to docs/dev/src/images/logo-title.svg diff --git a/docs/images/logo.svg b/docs/dev/src/images/logo.svg similarity index 100% rename from docs/images/logo.svg rename to docs/dev/src/images/logo.svg diff --git a/docs/images/meta-service/cluster-deployment.svg b/docs/dev/src/images/meta-service/cluster-deployment.svg similarity index 100% rename from docs/images/meta-service/cluster-deployment.svg rename to docs/dev/src/images/meta-service/cluster-deployment.svg diff --git a/docs/images/meta-service/notification.svg b/docs/dev/src/images/meta-service/notification.svg similarity index 100% rename from docs/images/meta-service/notification.svg rename to docs/dev/src/images/meta-service/notification.svg diff --git a/docs/images/mv-on-mv/mv-on-mv-01.svg b/docs/dev/src/images/mv-on-mv/mv-on-mv-01.svg similarity index 100% rename from docs/images/mv-on-mv/mv-on-mv-01.svg rename to docs/dev/src/images/mv-on-mv/mv-on-mv-01.svg diff --git a/docs/images/mv-on-mv/mv-on-mv-02.svg b/docs/dev/src/images/mv-on-mv/mv-on-mv-02.svg similarity index 100% rename from docs/images/mv-on-mv/mv-on-mv-02.svg rename to docs/dev/src/images/mv-on-mv/mv-on-mv-02.svg diff --git a/docs/images/relational-table-layer/relational-table-01.svg b/docs/dev/src/images/relational-table/relational-table-01.svg similarity index 100% rename from docs/images/relational-table-layer/relational-table-01.svg rename to docs/dev/src/images/relational-table/relational-table-01.svg diff --git a/docs/images/relational-table-layer/relational-table-02.svg b/docs/dev/src/images/relational-table/relational-table-02.svg similarity index 100% rename from docs/images/relational-table-layer/relational-table-02.svg rename to docs/dev/src/images/relational-table/relational-table-02.svg diff --git a/docs/images/relational-table-layer/relational-table-03.svg b/docs/dev/src/images/relational-table/relational-table-03.svg similarity index 100% rename from docs/images/relational-table-layer/relational-table-03.svg rename to docs/dev/src/images/relational-table/relational-table-03.svg diff --git a/docs/images/state-store-overview/state-store-overview-01.svg b/docs/dev/src/images/state-store-overview/state-store-overview-01.svg similarity index 100% rename from docs/images/state-store-overview/state-store-overview-01.svg rename to docs/dev/src/images/state-store-overview/state-store-overview-01.svg diff --git a/docs/images/state-store-overview/state-store-overview-02.svg b/docs/dev/src/images/state-store-overview/state-store-overview-02.svg similarity index 100% rename from docs/images/state-store-overview/state-store-overview-02.svg rename to docs/dev/src/images/state-store-overview/state-store-overview-02.svg diff --git a/docs/images/state-store-overview/state-store-overview-03.svg b/docs/dev/src/images/state-store-overview/state-store-overview-03.svg similarity index 100% rename from docs/images/state-store-overview/state-store-overview-03.svg rename to docs/dev/src/images/state-store-overview/state-store-overview-03.svg diff --git a/docs/images/state-store-overview/state-store-overview-04.svg b/docs/dev/src/images/state-store-overview/state-store-overview-04.svg similarity index 100% rename from docs/images/state-store-overview/state-store-overview-04.svg rename to docs/dev/src/images/state-store-overview/state-store-overview-04.svg diff --git a/docs/images/state-store-overview/state-store-overview-05.svg b/docs/dev/src/images/state-store-overview/state-store-overview-05.svg similarity index 100% rename from docs/images/state-store-overview/state-store-overview-05.svg rename to docs/dev/src/images/state-store-overview/state-store-overview-05.svg diff --git a/docs/images/streaming-overview/streaming-architecture.svg b/docs/dev/src/images/streaming-overview/streaming-architecture.svg similarity index 100% rename from docs/images/streaming-overview/streaming-architecture.svg rename to docs/dev/src/images/streaming-overview/streaming-architecture.svg diff --git a/docs/images/streaming-overview/streaming-executor-and-compute-node.svg b/docs/dev/src/images/streaming-overview/streaming-executor-and-compute-node.svg similarity index 100% rename from docs/images/streaming-overview/streaming-executor-and-compute-node.svg rename to docs/dev/src/images/streaming-overview/streaming-executor-and-compute-node.svg diff --git a/docs/metrics.md b/docs/dev/src/metrics.md similarity index 98% rename from docs/metrics.md rename to docs/dev/src/metrics.md index b0216c07fc83e..14d98c7a365ea 100644 --- a/docs/metrics.md +++ b/docs/dev/src/metrics.md @@ -5,7 +5,7 @@ It covers what each metric measures, and what information we may derive from it. ## Barrier Latency -Prerequisite: [Checkpoint](./checkpoint.md) +Prerequisite: [Checkpoint](./design/checkpoint.md) This metric measures the duration from which a barrier is injected into **all** sources in the stream graph, to the barrier flown through all executors in the graph. diff --git a/docs/relational_table/relational-table-schema.md b/docs/relational_table/relational-table-schema.md deleted file mode 100644 index 64cd615feda25..0000000000000 --- a/docs/relational_table/relational-table-schema.md +++ /dev/null @@ -1,35 +0,0 @@ -# Relational Table Schema - -We introduce the rough row-based encoding format in [relational states](storing-state-using-relational-table.md#row-based-encoding) - -In this doc, we will take HashAgg with extreme state (`max`, `min`) or value state (`sum`, `count`) for example, and introduce a more detailed design for the internal table schema. - -[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144) - -## Table id -`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls. - -## Value State (Sum, Count) -Query example: -```sql -select sum(v2), count(v3) from t group by v1 -``` - -This query will need to initiate 2 Relational Tables. The schema is `table_id/group_key`. - -## Extreme State (Max, Min) -Query example: -```sql -select max(v2), min(v3) from t group by v1 -``` - -This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes `table_id/group_key/sort_key/upstream_pk`. - -The order of `sort_key` depends on the agg call kind. For example, if it's `max()`, `sort_key` will order with `Ascending`. if it's `min()`, `sort_key` will order with `Descending`. -The `upstream_pk` is also appended to ensure the uniqueness of the key. -This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be `update` or `delete` operations in the stream, it's impossible to always guarantee correct results without storing all data. - -If `t` is created with append-only flag, the schema becomes `table_id/group_key`, which is the same for Value State. This is because in the append-only mode, there is no `update` or `delete` operation, so the cache will never miss. Therefore, we only need to write one value to the storage. - - - diff --git a/docs/rustdoc/README.md b/docs/rustdoc/README.md index 1b3e70e1113c2..0adf956748290 100644 --- a/docs/rustdoc/README.md +++ b/docs/rustdoc/README.md @@ -1,6 +1,6 @@ This folder contains files for generating a nice rustdoc index page. -Online version (for latest main): +Online version (for latest main): To build and open locally, run the following command in the project root: diff --git a/docs/rustdoc/index.md b/docs/rustdoc/index.md index cfb74b8055b8a..a76edb23cb2b4 100644 --- a/docs/rustdoc/index.md +++ b/docs/rustdoc/index.md @@ -4,9 +4,7 @@ Welcome to an overview of the developer documentations of RisingWave! ## Developer Docs -To learn how to develop RisingWave, see the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/). - -The [design docs](https://github.com/risingwavelabs/risingwave/blob/main/docs/README.md) covers some high-level ideas of how we built RisingWave. +To learn how to develop RisingWave, and access high-level design docs, see the [RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/). ## Crate Docs diff --git a/docs/rustdoc/rust.css b/docs/rustdoc/rust.css index 71cf5e3df0004..9c76bb08c3898 100644 --- a/docs/rustdoc/rust.css +++ b/docs/rustdoc/rust.css @@ -1,18 +1,21 @@ /* This file is copied from the Rust Project, which is dual-licensed under -Apache 2.0 and MIT terms. */ +Apache 2.0 and MIT terms. https: //github.com/rust-lang/rust/blob/7d640b670e521a0491ea1e49082d1cb5632e2562/src/doc/rust.css +*/ /* General structure */ body { + font-family: serif; margin: 0 auto; padding: 0 15px; font-size: 18px; - color: #333; + color: #000; line-height: 1.428571429; -webkit-box-sizing: unset; -moz-box-sizing: unset; box-sizing: unset; + background: #fff; } @media (min-width: 768px) { body { @@ -20,6 +23,14 @@ body { } } +h1, +h2, +h3, +h4, +h5, +h6 { + font-family: sans-serif; +} h2, h3, h4, h5, h6 { font-weight: 400; line-height: 1.1; @@ -37,8 +48,8 @@ h4, h5, h6 { margin-bottom: 10px; padding: 5px 10px; } -h5, h6 { - color: black; +h5, +h6 { text-decoration: underline; } @@ -135,6 +146,31 @@ h1 a:link, h1 a:visited, h2 a:link, h2 a:visited, h3 a:link, h3 a:visited, h4 a:link, h4 a:visited, h5 a:link, h5 a:visited {color: black;} +h1, +h2, +h3, +h4, +h5 { + /* This is needed to be able to position the doc-anchor. Ideally there + would be a
around the whole document, but we don't have that. */ + position: relative; +} + +a.doc-anchor { + color: black; + display: none; + position: absolute; + left: -20px; + /* We add this padding so that when the cursor moves from the heading's text to the anchor, + the anchor doesn't disappear. */ + padding-right: 5px; + /* And this padding is used to make the anchor larger and easier to click on. */ + padding-left: 3px; +} + +*:hover>.doc-anchor { + display: block; +} /* Code */ pre, code { diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 7d21fc44b4e57..9017f58606f7c 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -44,8 +44,7 @@ pub fn compute(opts: ComputeNodeOpts) -> ! { pub fn meta(opts: MetaNodeOpts) -> ! { init_risingwave_logger(LoggerSettings::from_opts(&opts)); - // TODO(shutdown): pass the shutdown token - main_okk(|_| risingwave_meta_node::start(opts)); + main_okk(|shutdown| risingwave_meta_node::start(opts, shutdown)); } pub fn frontend(opts: FrontendOpts) -> ! { diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 4f8c208c89aa3..325f2f8ff395b 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -194,9 +194,10 @@ pub async fn standalone( is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem)); tracing::info!("starting meta-node thread with cli args: {:?}", opts); + let shutdown = shutdown.clone(); let _meta_handle = tokio::spawn(async move { let dangerous_max_idle_secs = opts.dangerous_max_idle_secs; - risingwave_meta_node::start(opts).await; + risingwave_meta_node::start(opts, shutdown).await; tracing::warn!("meta is stopped, shutdown all nodes"); if let Some(idle_exit_secs) = dangerous_max_idle_secs { eprintln!("{}", diff --git a/src/frontend/planner_test/tests/testdata/input/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/input/generated_columns.yaml index bd6a8f453ab71..7078f6e797342 100644 --- a/src/frontend/planner_test/tests/testdata/input/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/input/generated_columns.yaml @@ -14,6 +14,11 @@ select proctime(); expected_outputs: - binder_error +- name: proctime cast to with timezone + sql: | + explain create table t1 (proc_time TIMESTAMPTZ AS proctime()); + expected_outputs: + - explain_output - name: proctime cast to without timezone sql: | explain create table t1 (proc_time TIMESTAMP AS proctime()); diff --git a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml index 3ed95c1ac1463..92f6aaa47e175 100644 --- a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml @@ -26,6 +26,17 @@ Caused by: Invalid input syntax: Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want? +- name: proctime cast to with timezone + sql: | + explain create table t1 (proc_time TIMESTAMPTZ AS proctime()); + explain_output: | + StreamMaterialize { columns: [proc_time, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite, watermark_columns: [proc_time] } + └─StreamRowIdGen { row_id_index: 1 } + └─StreamUnion { all: true, output_watermarks: [$expr1] } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamProject { exprs: [Proctime as $expr1, _row_id], output_watermarks: [$expr1] } + └─StreamDml { columns: [_row_id] } + └─StreamSource - name: proctime cast to without timezone sql: | explain create table t1 (proc_time TIMESTAMP AS proctime()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 89142d0e9b237..19ffbaed92e28 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -94,6 +94,14 @@ macro_rules! impl_expr_impl { $($t(Box<$t>),)* } + impl ExprImpl { + pub fn variant_name(&self) -> &'static str { + match self { + $(ExprImpl::$t(_) => stringify!($t),)* + } + } + } + $( impl From<$t> for ExprImpl { fn from(o: $t) -> ExprImpl { diff --git a/src/frontend/src/expr/utils.rs b/src/frontend/src/expr/utils.rs index cc49f3c215378..b5f68fed4dfbe 100644 --- a/src/frontend/src/expr/utils.rs +++ b/src/frontend/src/expr/utils.rs @@ -508,191 +508,6 @@ pub fn rewrite_now_to_proctime(expr: ExprImpl) -> ExprImpl { r.rewrite_expr(expr) } -/// analyze if the expression can derive a watermark from some input watermark. If it can -/// derive, return the input watermark column index -pub fn try_derive_watermark(expr: &ExprImpl) -> WatermarkDerivation { - let a = WatermarkAnalyzer {}; - a.visit_expr(expr) -} - -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum WatermarkDerivation { - /// The expression will return a constant and not depends on its input. - Constant, - /// Can derive a watermark if an input column has watermark, the usize field is the input - /// column index. - Watermark(usize), - /// For nondecreasing functions, we can always produce watermarks from where they are called. - Nondecreasing, - /// Can not derive a watermark in any cases. - None, -} - -#[derive(Clone, Default)] -struct WatermarkAnalyzer {} - -impl WatermarkAnalyzer { - fn visit_expr(&self, expr: &ExprImpl) -> WatermarkDerivation { - match expr { - ExprImpl::InputRef(inner) => WatermarkDerivation::Watermark(inner.index()), - ExprImpl::Literal(_) => WatermarkDerivation::Constant, - ExprImpl::FunctionCall(inner) => self.visit_function_call(inner), - ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call(inner.base()), - ExprImpl::TableFunction(_) => WatermarkDerivation::None, - ExprImpl::Subquery(_) - | ExprImpl::AggCall(_) - | ExprImpl::CorrelatedInputRef(_) - | ExprImpl::WindowFunction(_) - | ExprImpl::Parameter(_) - | ExprImpl::Now(_) => unreachable!(), - ExprImpl::UserDefinedFunction(_) => WatermarkDerivation::None, - } - } - - fn visit_unary_op(&self, inputs: &[ExprImpl]) -> WatermarkDerivation { - assert_eq!(inputs.len(), 1); - self.visit_expr(&inputs[0]) - } - - fn visit_binary_op(&self, inputs: &[ExprImpl]) -> (WatermarkDerivation, WatermarkDerivation) { - assert_eq!(inputs.len(), 2); - (self.visit_expr(&inputs[0]), self.visit_expr(&inputs[1])) - } - - fn visit_ternary_op( - &self, - inputs: &[ExprImpl], - ) -> ( - WatermarkDerivation, - WatermarkDerivation, - WatermarkDerivation, - ) { - assert_eq!(inputs.len(), 3); - ( - self.visit_expr(&inputs[0]), - self.visit_expr(&inputs[1]), - self.visit_expr(&inputs[2]), - ) - } - - fn visit_function_call(&self, func_call: &FunctionCall) -> WatermarkDerivation { - use WatermarkDerivation::{Constant, Nondecreasing, Watermark}; - match func_call.func_type() { - ExprType::Unspecified => unreachable!(), - ExprType::Add => match self.visit_binary_op(func_call.inputs()) { - (Constant, Constant) => Constant, - (Constant, Watermark(idx)) | (Watermark(idx), Constant) => Watermark(idx), - (Constant, Nondecreasing) | (Nondecreasing, Constant) => Nondecreasing, - _ => WatermarkDerivation::None, - }, - ExprType::Subtract | ExprType::TumbleStart => { - if func_call.inputs().len() == 3 { - // With `offset` specified - // e.g., select * from tumble(t1, start, interval, offset); - assert_eq!(ExprType::TumbleStart, func_call.func_type()); - match self.visit_ternary_op(func_call.inputs()) { - (Constant, Constant, Constant) => Constant, - (Watermark(idx), Constant, Constant) => Watermark(idx), - (Nondecreasing, Constant, Constant) => Nondecreasing, - _ => WatermarkDerivation::None, - } - } else { - match self.visit_binary_op(func_call.inputs()) { - (Constant, Constant) => Constant, - (Watermark(idx), Constant) => Watermark(idx), - (Nondecreasing, Constant) => Nondecreasing, - _ => WatermarkDerivation::None, - } - } - } - ExprType::Multiply | ExprType::Divide | ExprType::Modulus => { - match self.visit_binary_op(func_call.inputs()) { - (Constant, Constant) => Constant, - // not meaningful to derive watermark for other situations - _ => WatermarkDerivation::None, - } - } - ExprType::AtTimeZone => match self.visit_binary_op(func_call.inputs()) { - (Constant, Constant) => Constant, - (derivation @ (Watermark(_) | Nondecreasing), Constant) => { - if !(func_call.return_type() == DataType::Timestamptz - && func_call.inputs()[0].return_type() == DataType::Timestamp) - && func_call.inputs()[1] - .as_literal() - .and_then(|literal| literal.get_data().as_ref()) - .map_or(true, |time_zone| { - !time_zone.as_utf8().eq_ignore_ascii_case("UTC") - }) - { - WatermarkDerivation::None - } else { - derivation - } - } - _ => WatermarkDerivation::None, - }, - ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => { - // Requires time zone and interval to be literal, at least for now. - let time_zone = match &func_call.inputs()[2] { - ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_utf8()), - _ => return WatermarkDerivation::None, - }; - let interval = match &func_call.inputs()[1] { - ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_interval()), - _ => return WatermarkDerivation::None, - }; - // null zone or null interval is treated same as const `interval '1' second`, to be - // consistent with other match arms. - let zone_without_dst = time_zone.map_or(true, |s| s.eq_ignore_ascii_case("UTC")); - let quantitative_only = interval.map_or(true, |v| { - v.months() == 0 && (v.days() == 0 || zone_without_dst) - }); - match (self.visit_expr(&func_call.inputs()[0]), quantitative_only) { - (Constant, _) => Constant, - (Watermark(idx), true) => Watermark(idx), - (Nondecreasing, true) => Nondecreasing, - (Watermark(_) | Nondecreasing, false) => WatermarkDerivation::None, - (WatermarkDerivation::None, _) => WatermarkDerivation::None, - } - } - ExprType::DateTrunc => match func_call.inputs().len() { - 2 => match self.visit_binary_op(func_call.inputs()) { - (Constant, any_derivation) => any_derivation, - _ => WatermarkDerivation::None, - }, - 3 => match self.visit_ternary_op(func_call.inputs()) { - (Constant, Constant, Constant) => Constant, - (Constant, derivation @ (Watermark(_) | Nondecreasing), Constant) => { - let zone_without_dst = func_call.inputs()[2] - .as_literal() - .and_then(|literal| literal.get_data().as_ref()) - .map_or(false, |s| s.as_utf8().eq_ignore_ascii_case("UTC")); - if zone_without_dst { - derivation - } else { - WatermarkDerivation::None - } - } - _ => WatermarkDerivation::None, - }, - _ => unreachable!(), - }, - ExprType::SecToTimestamptz => self.visit_unary_op(func_call.inputs()), - ExprType::CharToTimestamptz => WatermarkDerivation::None, - ExprType::Cast => { - // TODO: need more derivation - WatermarkDerivation::None - } - ExprType::Case => { - // TODO: do we need derive watermark when every case can derive a common watermark? - WatermarkDerivation::None - } - ExprType::Proctime => Nondecreasing, - _ => WatermarkDerivation::None, - } - } -} - #[cfg(test)] mod tests { use risingwave_common::types::{DataType, ScalarImpl}; diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index e8ff1df6e82db..eae1bd5a34d5e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -20,10 +20,9 @@ use risingwave_pb::stream_plan::ProjectNode; use super::stream::prelude::*; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::expr::{ - try_derive_watermark, Expr, ExprImpl, ExprRewriter, ExprVisitor, WatermarkDerivation, -}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -84,21 +83,22 @@ impl StreamProject { let mut nondecreasing_exprs = vec![]; let mut watermark_columns = FixedBitSet::with_capacity(core.exprs.len()); for (expr_idx, expr) in core.exprs.iter().enumerate() { - match try_derive_watermark(expr) { - WatermarkDerivation::Watermark(input_idx) => { + use monotonicity_variants::*; + match analyze_monotonicity(expr) { + FollowingInput(input_idx) => { if input.watermark_columns().contains(input_idx) { watermark_derivations.push((input_idx, expr_idx)); watermark_columns.insert(expr_idx); } } - WatermarkDerivation::Nondecreasing => { + Inherent(NonDecreasing) => { nondecreasing_exprs.push(expr_idx); watermark_columns.insert(expr_idx); } - WatermarkDerivation::Constant => { + Inherent(Constant) => { // XXX(rc): we can produce one watermark on each recovery for this case. } - WatermarkDerivation::None => {} + Inherent(_) | _FollowingInputInversely(_) => {} } } // Project executor won't change the append-only behavior of the stream, so it depends on diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 2af5d54234363..b65d4e8da0b53 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -20,8 +20,9 @@ use risingwave_pb::stream_plan::ProjectSetNode; use super::stream::prelude::*; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::expr::{try_derive_watermark, ExprRewriter, ExprVisitor, WatermarkDerivation}; +use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -48,21 +49,22 @@ impl StreamProjectSet { let mut nondecreasing_exprs = vec![]; let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); for (expr_idx, expr) in core.select_list.iter().enumerate() { - match try_derive_watermark(expr) { - WatermarkDerivation::Watermark(input_idx) => { + use monotonicity_variants::*; + match analyze_monotonicity(expr) { + FollowingInput(input_idx) => { if input.watermark_columns().contains(input_idx) { watermark_derivations.push((input_idx, expr_idx)); watermark_columns.insert(expr_idx + 1); } } - WatermarkDerivation::Nondecreasing => { + Inherent(NonDecreasing) => { nondecreasing_exprs.push(expr_idx); watermark_columns.insert(expr_idx + 1); } - WatermarkDerivation::Constant => { + Inherent(Constant) => { // XXX(rc): we can produce one watermark on each recovery for this case. } - WatermarkDerivation::None => {} + Inherent(_) | _FollowingInputInversely(_) => {} } } diff --git a/src/frontend/src/optimizer/property/mod.rs b/src/frontend/src/optimizer/property/mod.rs index 69871d0003596..ae6ebe7e8288a 100644 --- a/src/frontend/src/optimizer/property/mod.rs +++ b/src/frontend/src/optimizer/property/mod.rs @@ -32,3 +32,5 @@ mod func_dep; pub use func_dep::*; mod cardinality; pub use cardinality::*; +mod monotonicity; +pub use monotonicity::*; diff --git a/src/frontend/src/optimizer/property/monotonicity.rs b/src/frontend/src/optimizer/property/monotonicity.rs new file mode 100644 index 0000000000000..87f74c25b83f4 --- /dev/null +++ b/src/frontend/src/optimizer/property/monotonicity.rs @@ -0,0 +1,273 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use enum_as_inner::EnumAsInner; +use risingwave_common::types::DataType; +use risingwave_pb::expr::expr_node::Type as ExprType; + +use crate::expr::{Expr, ExprImpl, FunctionCall, TableFunction}; + +/// Represents the derivation of the monotonicity of a column. +/// This enum aims to unify the "non-decreasing analysis" and watermark derivation. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] +pub enum MonotonicityDerivation { + /// The monotonicity of the column is inherent, meaning that it is derived from the column itself. + Inherent(Monotonicity), + /// The monotonicity of the column follows the monotonicity of the specified column in the input. + FollowingInput(usize), + /// The monotonicity of the column INVERSELY follows the monotonicity of the specified column in the input. + /// This is not used currently. + _FollowingInputInversely(usize), +} + +impl MonotonicityDerivation { + pub fn inverse(self) -> Self { + use MonotonicityDerivation::*; + match self { + Inherent(monotonicity) => Inherent(monotonicity.inverse()), + FollowingInput(idx) => _FollowingInputInversely(idx), + _FollowingInputInversely(idx) => FollowingInput(idx), + } + } +} + +/// Represents the monotonicity of a column. `NULL`s are considered largest when analyzing monotonicity. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Monotonicity { + Constant, + NonDecreasing, + NonIncreasing, + Unknown, +} + +impl Monotonicity { + pub fn inverse(self) -> Self { + use Monotonicity::*; + match self { + Constant => Constant, + NonDecreasing => NonIncreasing, + NonIncreasing => NonDecreasing, + Unknown => Unknown, + } + } +} + +pub mod monotonicity_variants { + pub use super::Monotonicity::*; + pub use super::MonotonicityDerivation::*; +} + +/// Analyze the monotonicity of an expression. +pub fn analyze_monotonicity(expr: &ExprImpl) -> MonotonicityDerivation { + let analyzer = MonotonicityAnalyzer {}; + analyzer.visit_expr(expr) +} + +struct MonotonicityAnalyzer {} + +impl MonotonicityAnalyzer { + fn visit_expr(&self, expr: &ExprImpl) -> MonotonicityDerivation { + use monotonicity_variants::*; + match expr { + // recursion base + ExprImpl::InputRef(inner) => FollowingInput(inner.index()), + ExprImpl::Literal(_) => Inherent(Constant), + ExprImpl::Now(_) => Inherent(NonDecreasing), + ExprImpl::UserDefinedFunction(_) => Inherent(Unknown), + + // recursively visit children + ExprImpl::FunctionCall(inner) => self.visit_function_call(inner), + ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call(inner.base()), + ExprImpl::TableFunction(inner) => self.visit_table_function(inner), + + // the analyzer is not expected to be used when the following expression types are present + ExprImpl::Subquery(_) + | ExprImpl::AggCall(_) + | ExprImpl::CorrelatedInputRef(_) + | ExprImpl::WindowFunction(_) + | ExprImpl::Parameter(_) => panic!( + "Expression `{}` is not expected in the monotonicity analyzer", + expr.variant_name() + ), + } + } + + fn visit_unary_op(&self, inputs: &[ExprImpl]) -> MonotonicityDerivation { + assert_eq!(inputs.len(), 1); + self.visit_expr(&inputs[0]) + } + + fn visit_binary_op( + &self, + inputs: &[ExprImpl], + ) -> (MonotonicityDerivation, MonotonicityDerivation) { + assert_eq!(inputs.len(), 2); + (self.visit_expr(&inputs[0]), self.visit_expr(&inputs[1])) + } + + fn visit_ternary_op( + &self, + inputs: &[ExprImpl], + ) -> ( + MonotonicityDerivation, + MonotonicityDerivation, + MonotonicityDerivation, + ) { + assert_eq!(inputs.len(), 3); + ( + self.visit_expr(&inputs[0]), + self.visit_expr(&inputs[1]), + self.visit_expr(&inputs[2]), + ) + } + + fn visit_function_call(&self, func_call: &FunctionCall) -> MonotonicityDerivation { + use monotonicity_variants::*; + + fn time_zone_is_without_dst(time_zone: Option<&str>) -> bool { + #[allow(clippy::let_and_return)] // to make it more readable + let tz_is_utc = time_zone.map_or( + false, // conservative + |time_zone| time_zone.eq_ignore_ascii_case("UTC"), + ); + tz_is_utc // conservative + } + + match func_call.func_type() { + ExprType::Unspecified => unreachable!(), + ExprType::Add => match self.visit_binary_op(func_call.inputs()) { + (Inherent(Constant), any) | (any, Inherent(Constant)) => any, + (Inherent(NonDecreasing), Inherent(NonDecreasing)) => Inherent(NonDecreasing), + (Inherent(NonIncreasing), Inherent(NonIncreasing)) => Inherent(NonIncreasing), + _ => Inherent(Unknown), + }, + ExprType::Subtract => match self.visit_binary_op(func_call.inputs()) { + (any, Inherent(Constant)) => any, + (Inherent(Constant), any) => any.inverse(), + _ => Inherent(Unknown), + }, + ExprType::Multiply | ExprType::Divide | ExprType::Modulus => { + match self.visit_binary_op(func_call.inputs()) { + (Inherent(Constant), Inherent(Constant)) => Inherent(Constant), + _ => Inherent(Unknown), // let's be lazy here + } + } + ExprType::TumbleStart => { + if func_call.inputs().len() == 2 { + // without `offset`, args: `(start, interval)` + match self.visit_binary_op(func_call.inputs()) { + (any, Inherent(Constant)) => any, + _ => Inherent(Unknown), + } + } else { + // with `offset`, args: `(start, interval, offset)` + assert_eq!(ExprType::TumbleStart, func_call.func_type()); + match self.visit_ternary_op(func_call.inputs()) { + (any, Inherent(Constant), Inherent(Constant)) => any, + _ => Inherent(Unknown), + } + } + } + ExprType::AtTimeZone => match self.visit_binary_op(func_call.inputs()) { + (Inherent(Constant), Inherent(Constant)) => Inherent(Constant), + (any, Inherent(Constant)) => { + let time_zone = func_call.inputs()[1] + .as_literal() + .and_then(|literal| literal.get_data().as_ref()) + .map(|tz| tz.as_utf8().as_ref()); + // 1. For at_time_zone(timestamp, const timezone) -> timestamptz, when timestamp has some monotonicity, + // the result should have the same monotonicity. + // 2. For at_time_zone(timestamptz, const timezone) -> timestamp, when timestamptz has some monotonicity, + // the result only have the same monotonicity when the timezone is without DST (Daylight Saving Time). + if (func_call.inputs()[0].return_type() == DataType::Timestamp + && func_call.return_type() == DataType::Timestamptz) + || time_zone_is_without_dst(time_zone) + { + any + } else { + Inherent(Unknown) + } + } + _ => Inherent(Unknown), + }, + ExprType::DateTrunc => match func_call.inputs().len() { + 2 => match self.visit_binary_op(func_call.inputs()) { + (Inherent(Constant), any) => any, + _ => Inherent(Unknown), + }, + 3 => match self.visit_ternary_op(func_call.inputs()) { + (Inherent(Constant), Inherent(Constant), Inherent(Constant)) => { + Inherent(Constant) + } + (Inherent(Constant), any, Inherent(Constant)) => { + let time_zone = func_call.inputs()[2] + .as_literal() + .and_then(|literal| literal.get_data().as_ref()) + .map(|tz| tz.as_utf8().as_ref()); + if time_zone_is_without_dst(time_zone) { + any + } else { + Inherent(Unknown) + } + } + _ => Inherent(Unknown), + }, + _ => unreachable!(), + }, + ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => { + // Requires time zone and interval to be literal, at least for now. + let time_zone = match &func_call.inputs()[2] { + ExprImpl::Literal(lit) => { + lit.get_data().as_ref().map(|tz| tz.as_utf8().as_ref()) + } + _ => return Inherent(Unknown), + }; + let interval = match &func_call.inputs()[1] { + ExprImpl::Literal(lit) => lit + .get_data() + .as_ref() + .map(|interval| interval.as_interval()), + _ => return Inherent(Unknown), + }; + let quantitative_only = interval.map_or( + true, // null interval is treated as `interval '1' second` + |v| v.months() == 0 && (v.days() == 0 || time_zone_is_without_dst(time_zone)), + ); + match (self.visit_expr(&func_call.inputs()[0]), quantitative_only) { + (Inherent(Constant), _) => Inherent(Constant), + (any, true) => any, + _ => Inherent(Unknown), + } + } + ExprType::SecToTimestamptz => self.visit_unary_op(func_call.inputs()), + ExprType::CharToTimestamptz => Inherent(Unknown), + ExprType::Cast => { + // TODO: need more derivation + Inherent(Unknown) + } + ExprType::Case => { + // TODO: do we need derive watermark when every case can derive a common watermark? + Inherent(Unknown) + } + ExprType::Proctime => Inherent(NonDecreasing), + _ => Inherent(Unknown), + } + } + + fn visit_table_function(&self, _table_func: &TableFunction) -> MonotonicityDerivation { + // TODO: derive monotonicity for table funcs like `generate_series` + use monotonicity_variants::*; + Inherent(Unknown) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index cbdb65b4528a5..39dc88319816d 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -15,11 +15,10 @@ use risingwave_common::types::DataType; use risingwave_pb::plan_common::JoinType; -use crate::expr::{ - try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, -}; +use crate::expr::{ExprRewriter, FunctionCall, InputRef}; use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; +use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; use crate::utils::Condition; @@ -36,18 +35,19 @@ impl Rule for FilterWithNowToJoinRule { let mut now_filters = vec![]; let mut remainder = vec![]; - let mut rewriter = NowAsInputRef::new(lhs_len); - // If the `now` is not a valid dynamic filter expression, we will not push it down. filter.predicate().conjunctions.iter().for_each(|expr| { if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() { - let now_expr = rewriter.rewrite_expr(now_expr); - - // ensure that this expression will derive a watermark - if try_derive_watermark(&now_expr) != WatermarkDerivation::Watermark(lhs_len) { - remainder.push(expr.clone()); + // ensure that this expression is increasing + use monotonicity_variants::*; + if matches!(analyze_monotonicity(&now_expr), Inherent(NonDecreasing)) { + now_filters.push( + FunctionCall::new(cmp, vec![input_expr, now_expr]) + .unwrap() + .into(), + ); } else { - now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap()); + remainder.push(expr.clone()); } } else { remainder.push(expr.clone()); @@ -60,13 +60,15 @@ impl Rule for FilterWithNowToJoinRule { } let mut new_plan = plan.inputs()[0].clone(); + let mut rewriter = NowAsInputRef::new(lhs_len); for now_filter in now_filters { + let now_filter = rewriter.rewrite_expr(now_filter); new_plan = LogicalJoin::new( new_plan, LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { - conjunctions: vec![now_filter.into()], + conjunctions: vec![now_filter], }, ) .into() diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index a7600ba930a15..5fd658c8a6581 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -26,6 +26,7 @@ use redact::Secret; use risingwave_common::config::OverrideConfig; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; +use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_meta::*; @@ -204,7 +205,10 @@ use risingwave_common::config::{load_config, MetaBackend, RwConfig}; use tracing::info; /// Start meta node -pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { +pub fn start( + opts: MetaNodeOpts, + shutdown: CancellationToken, +) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { @@ -324,7 +328,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { max_timeout_ms / 1000 } + MIN_TIMEOUT_INTERVAL_SEC; - let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( + rpc_serve( add_info, backend, max_heartbeat_interval, @@ -428,42 +432,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { }, config.system.into_init_system_params(), Default::default(), + shutdown, ) .await .unwrap(); - - tracing::info!("Meta server listening at {}", listen_addr); - - match leader_lost_handle { - None => { - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap() - } - res = &mut join_handle => res.unwrap(), - }; - } - Some(mut handle) => { - tokio::select! { - _ = &mut handle => { - tracing::info!("receive leader lost signal"); - // When we lose leadership, we will exit as soon as possible. - } - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap(); - handle.abort(); - } - res = &mut join_handle => { - res.unwrap(); - handle.abort(); - }, - }; - } - }; }) } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index ed3ce2d986f69..74310c75374e5 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -16,9 +16,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use either::Either; use etcd_client::ConnectOptions; -use futures::future::join_all; use otlp_embedded::TraceServiceServer; use regex::Regex; use risingwave_common::monitor::{RouterExt, TcpConfig}; @@ -26,11 +24,13 @@ use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; +use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common_service::{MetricsManager, TracingExtractLayer}; use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::{MetaStoreImpl, MetadataManager, SystemParamsManagerImpl}; +use risingwave_meta::rpc::election::dummy::DummyElectionClient; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; @@ -76,10 +76,7 @@ use risingwave_pb::user::user_service_server::UserServiceServer; use risingwave_rpc_client::ComputeClientPool; use sea_orm::{ConnectionTrait, DbBackend}; use thiserror_ext::AsReport; -use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver}; use tokio::sync::watch; -use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; -use tokio::task::JoinHandle; use crate::backup_restore::BackupManager; use crate::barrier::{BarrierScheduler, GlobalBarrierManager}; @@ -124,6 +121,9 @@ pub mod started { } } +/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations. +/// +/// For the timing of returning, see [`rpc_serve_with_store`]. pub async fn rpc_serve( address_info: AddressInfo, meta_store_backend: MetaStoreBackend, @@ -132,7 +132,8 @@ pub async fn rpc_serve( opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, -) -> MetaResult<(JoinHandle<()>, Option>, WatchSender<()>)> { + shutdown: CancellationToken, +) -> MetaResult<()> { match meta_store_backend { MetaStoreBackend::Etcd { endpoints, @@ -168,27 +169,34 @@ pub async fn rpc_serve( rpc_serve_with_store( MetaStoreImpl::Kv(meta_store), - Some(election_client), + election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } MetaStoreBackend::Mem => { let meta_store = MemStore::new().into_ref(); + let dummy_election_client = Arc::new(DummyElectionClient::new( + address_info.advertise_addr.clone(), + )); rpc_serve_with_store( MetaStoreImpl::Kv(meta_store), - None, + dummy_election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } MetaStoreBackend::Sql { endpoint } => { let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) { @@ -225,130 +233,120 @@ pub async fn rpc_serve( rpc_serve_with_store( MetaStoreImpl::Sql(meta_store_sql), - Some(election_client), + election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } } } -#[expect(clippy::type_complexity)] -pub fn rpc_serve_with_store( +/// Bootstraps the follower or leader service based on the election status. +/// +/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader +/// service fails to start. +pub async fn rpc_serve_with_store( meta_store_impl: MetaStoreImpl, - election_client: Option, + election_client: ElectionClientRef, address_info: AddressInfo, max_cluster_heartbeat_interval: Duration, lease_interval_secs: u64, opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, -) -> MetaResult<(JoinHandle<()>, Option>, WatchSender<()>)> { - let (svc_shutdown_tx, svc_shutdown_rx) = watch::channel(()); + shutdown: CancellationToken, +) -> MetaResult<()> { + // TODO(shutdown): directly use cancellation token + let (election_shutdown_tx, election_shutdown_rx) = watch::channel(()); - let leader_lost_handle = if let Some(election_client) = election_client.clone() { - let stop_rx = svc_shutdown_tx.subscribe(); + let election_handle = tokio::spawn({ + let shutdown = shutdown.clone(); + let election_client = election_client.clone(); - let handle = tokio::spawn(async move { + async move { while let Err(e) = election_client - .run_once(lease_interval_secs as i64, stop_rx.clone()) + .run_once(lease_interval_secs as i64, election_shutdown_rx.clone()) .await { tracing::error!(error = %e.as_report(), "election error happened"); } - }); + // Leader lost, shutdown the service. + shutdown.cancel(); + } + }); - Some(handle) - } else { - None - }; + // Spawn and run the follower service if not the leader. + // Watch the leader status and switch to the leader service when elected. + // TODO: the branch seems to be always hit since the default value of `is_leader` is false until + // the election is done (unless using `DummyElectionClient`). + if !election_client.is_leader() { + // The follower service can be shutdown separately if we're going to be the leader. + let follower_shutdown = shutdown.child_token(); + + let follower_handle = tokio::spawn(start_service_as_election_follower( + follower_shutdown.clone(), + address_info.clone(), + election_client.clone(), + )); - let join_handle = tokio::spawn(async move { - if let Some(election_client) = election_client.clone() { - let mut is_leader_watcher = election_client.subscribe(); - let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - let (follower_shutdown_tx, follower_shutdown_rx) = OneChannel::<()>(); + // Watch and wait until we become the leader. + let mut is_leader_watcher = election_client.subscribe(); + while !*is_leader_watcher.borrow_and_update() { tokio::select! { - _ = svc_shutdown_rx_clone.changed() => return, + // External shutdown signal. Directly return without switching to leader. + _ = shutdown.cancelled() => return Ok(()), + res = is_leader_watcher.changed() => { if res.is_err() { tracing::error!("leader watcher recv failed"); } } } - let svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - - // If not the leader, spawn a follower. - let follower_handle: Option> = if !*is_leader_watcher.borrow() { - let address_info_clone = address_info.clone(); - - let election_client_ = election_client.clone(); - Some(tokio::spawn(async move { - start_service_as_election_follower( - svc_shutdown_rx_clone, - follower_shutdown_rx, - address_info_clone, - Some(election_client_), - ) - .await; - })) - } else { - None - }; + } - let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - while !*is_leader_watcher.borrow_and_update() { - tokio::select! { - _ = svc_shutdown_rx_clone.changed() => { - return; - } - res = is_leader_watcher.changed() => { - if res.is_err() { - tracing::error!("leader watcher recv failed"); - } - } - } - } + tracing::info!("elected as leader, shutting down follower services"); + follower_shutdown.cancel(); + let _ = follower_handle.await; + } - if let Some(handle) = follower_handle { - let _res = follower_shutdown_tx.send(()); - let _ = handle.await; - } - }; + // Run the leader service. + let result = start_service_as_election_leader( + meta_store_impl, + address_info, + max_cluster_heartbeat_interval, + opts, + init_system_params, + init_session_config, + election_client, + shutdown, + ) + .await; - start_service_as_election_leader( - meta_store_impl, - address_info, - max_cluster_heartbeat_interval, - opts, - init_system_params, - init_session_config, - election_client, - svc_shutdown_rx, - ) - .await - .expect("Unable to start leader services"); - }); + // Leader service has stopped, shutdown the election service to gracefully resign. + election_shutdown_tx.send(()).ok(); + let _ = election_handle.await; - Ok((join_handle, leader_lost_handle, svc_shutdown_tx)) + result } -/// Starts all services needed for the meta follower node +/// Starts all services needed for the meta follower node. +/// +/// Returns when the `shutdown` token is triggered. pub async fn start_service_as_election_follower( - mut svc_shutdown_rx: WatchReceiver<()>, - follower_shutdown_rx: OneReceiver<()>, + shutdown: CancellationToken, address_info: AddressInfo, - election_client: Option, + election_client: ElectionClientRef, ) { - let meta_member_srv = MetaMemberServiceImpl::new(match election_client { - None => Either::Right(address_info.clone()), - Some(election_client) => Either::Left(election_client), - }); + tracing::info!("starting follower services"); + + let meta_member_srv = MetaMemberServiceImpl::new(election_client); let health_srv = HealthServiceImpl::new(); @@ -366,35 +364,21 @@ pub async fn start_service_as_election_follower( tcp_nodelay: true, keepalive_duration: None, }, - async move { - tokio::select! { - // shutdown service if all services should be shut down - res = svc_shutdown_rx.changed() => { - match res { - Ok(_) => tracing::info!("Shutting down services"), - Err(_) => tracing::error!("Service shutdown sender dropped") - } - }, - // shutdown service if follower becomes leader - res = follower_shutdown_rx => { - match res { - Ok(_) => tracing::info!("Shutting down follower services"), - Err(_) => tracing::error!("Follower service shutdown sender dropped") - } - }, - } - }, + shutdown.clone().cancelled_owned(), ); + let server_handle = tokio::spawn(server); started::set(); - server.await; + + // Wait for the shutdown signal. + shutdown.cancelled().await; + // Wait for the server to shutdown. This is necessary because we may be transitioning from follower + // to leader, and conflicts on the services must be avoided. + let _ = server_handle.await; } -/// Starts all services needed for the meta leader node -/// Only call this function once, since initializing the services multiple times will result in an -/// inconsistent state +/// Starts all services needed for the meta leader node. /// -/// ## Returns -/// Returns an error if the service initialization failed +/// Returns when the `shutdown` token is triggered, or if the service initialization fails. pub async fn start_service_as_election_leader( meta_store_impl: MetaStoreImpl, address_info: AddressInfo, @@ -402,10 +386,11 @@ pub async fn start_service_as_election_leader( opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, - election_client: Option, - mut svc_shutdown_rx: WatchReceiver<()>, + election_client: ElectionClientRef, + shutdown: CancellationToken, ) -> MetaResult<()> { - tracing::info!("Defining leader services"); + tracing::info!("starting leader services"); + let env = MetaSrvEnv::new( opts.clone(), init_system_params, @@ -479,10 +464,7 @@ pub async fn start_service_as_election_leader( .unwrap(); let object_store_media_type = hummock_manager.object_store_media_type(); - let meta_member_srv = MetaMemberServiceImpl::new(match election_client.clone() { - None => Either::Right(address_info.clone()), - Some(election_client) => Either::Left(election_client), - }); + let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone()); let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| { use std::str::FromStr; @@ -504,7 +486,7 @@ pub async fn start_service_as_election_leader( let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone()); #[cfg(not(madsim))] - let dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr { + let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr { let dashboard_service = crate::dashboard::DashboardService { dashboard_addr: *dashboard_addr, prometheus_client, @@ -537,6 +519,7 @@ pub async fn start_service_as_election_leader( ); let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(); + // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks. let mut sub_tasks = vec![shutdown_handle]; let stream_rpc_manager = StreamRpcManager::new(env.clone()); @@ -722,17 +705,17 @@ pub async fn start_service_as_election_leader( sub_tasks.push(stream_manager.start_auto_parallelism_monitor()); } } - let (idle_send, idle_recv) = tokio::sync::oneshot::channel(); - sub_tasks.push(IdleManager::start_idle_checker( + + let _idle_checker_handle = IdleManager::start_idle_checker( env.idle_manager_ref(), Duration::from_secs(30), - idle_send, - )); + shutdown.clone(), + ); let (abort_sender, abort_recv) = tokio::sync::oneshot::channel(); let notification_mgr = env.notification_manager_ref(); let stream_abort_handler = tokio::spawn(async move { - abort_recv.await.unwrap(); + let _ = abort_recv.await; notification_mgr.abort_all().await; compactor_manager.abort_all_compactors(); }); @@ -763,33 +746,6 @@ pub async fn start_service_as_election_leader( sub_tasks.push(pair); } - let shutdown_all = async move { - let mut handles = Vec::with_capacity(sub_tasks.len()); - - for (join_handle, shutdown_sender) in sub_tasks { - if let Err(_err) = shutdown_sender.send(()) { - continue; - } - - handles.push(join_handle); - } - - // The barrier manager can't be shutdown gracefully if it's under recovering, try to - // abort it using timeout. - match tokio::time::timeout(Duration::from_secs(1), join_all(handles)).await { - Ok(results) => { - for result in results { - if result.is_err() { - tracing::warn!("Failed to join shutdown"); - } - } - } - Err(_e) => { - tracing::warn!("Join shutdown timeout"); - } - } - }; - tracing::info!("Assigned cluster id {:?}", *env.cluster_id()); tracing::info!("Starting meta services"); @@ -833,28 +789,15 @@ pub async fn start_service_as_election_leader( tcp_nodelay: true, keepalive_duration: None, }, - async move { - tokio::select! { - res = svc_shutdown_rx.changed() => { - match res { - Ok(_) => tracing::info!("Shutting down services"), - Err(_) => tracing::error!("Service shutdown receiver dropped") - } - shutdown_all.await; - }, - _ = idle_recv => { - shutdown_all.await; - }, - } - }, + shutdown.clone().cancelled_owned(), ); started::set(); - server.await; + let _server_handle = tokio::spawn(server); - #[cfg(not(madsim))] - if let Some(dashboard_task) = dashboard_task { - dashboard_task.abort(); - } + // Wait for the shutdown signal. + shutdown.cancelled().await; + // TODO(shutdown): may warn user if there's any other node still running in the cluster. + // TODO(shutdown): do we have any other shutdown tasks? Ok(()) } diff --git a/src/meta/service/src/meta_member_service.rs b/src/meta/service/src/meta_member_service.rs index b8f5d9ebf92c4..946337d248485 100644 --- a/src/meta/service/src/meta_member_service.rs +++ b/src/meta/service/src/meta_member_service.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use risingwave_common::util::addr::HostAddr; use risingwave_meta::rpc::ElectionClientRef; use risingwave_pb::common::HostAddress; @@ -20,17 +19,14 @@ use risingwave_pb::meta::meta_member_service_server::MetaMemberService; use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember}; use tonic::{Request, Response, Status}; -use crate::AddressInfo; #[derive(Clone)] pub struct MetaMemberServiceImpl { - election_client_or_self: Either, + election_client: ElectionClientRef, } impl MetaMemberServiceImpl { - pub fn new(election_client_or_self: Either) -> Self { - MetaMemberServiceImpl { - election_client_or_self, - } + pub fn new(election_client: ElectionClientRef) -> Self { + MetaMemberServiceImpl { election_client } } } @@ -41,39 +37,20 @@ impl MetaMemberService for MetaMemberServiceImpl { &self, _request: Request, ) -> Result, Status> { - let members = match &self.election_client_or_self { - Either::Left(election_client) => { - let mut members = vec![]; - for member in election_client.get_members().await? { - let host_addr = member - .id - .parse::() - .map_err(|err| Status::from_error(err.into()))?; - members.push(MetaMember { - address: Some(HostAddress { - host: host_addr.host, - port: host_addr.port.into(), - }), - is_leader: member.is_leader, - }) - } - - members - } - Either::Right(self_as_leader) => { - let host_addr = self_as_leader - .advertise_addr - .parse::() - .map_err(|err| Status::from_error(err.into()))?; - vec![MetaMember { - address: Some(HostAddress { - host: host_addr.host, - port: host_addr.port.into(), - }), - is_leader: true, - }] - } - }; + let mut members = vec![]; + for member in self.election_client.get_members().await? { + let host_addr = member + .id + .parse::() + .map_err(|err| Status::from_error(err.into()))?; + members.push(MetaMember { + address: Some(HostAddress { + host: host_addr.host, + port: host_addr.port.into(), + }), + is_leader: member.is_leader, + }) + } Ok(Response::new(MembersResponse { members })) } diff --git a/src/meta/src/manager/idle.rs b/src/meta/src/manager/idle.rs index afa9fbf860932..431575e665b88 100644 --- a/src/meta/src/manager/idle.rs +++ b/src/meta/src/manager/idle.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::oneshot::Sender; +use risingwave_common::util::tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; /// `IdleManager` keeps track of latest activity and report whether the meta service has been @@ -77,24 +77,17 @@ impl IdleManager { pub fn start_idle_checker( idle_manager: IdleManagerRef, check_interval: Duration, - idle_send: tokio::sync::oneshot::Sender<()>, - ) -> (JoinHandle<()>, Sender<()>) { + shutdown: CancellationToken, + ) -> JoinHandle<()> { let dur = idle_manager.get_config_max_idle(); if !dur.is_zero() { tracing::warn!("--dangerous-max-idle-secs is set. The meta server will be automatically stopped after idle for {:?}.", dur) } - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); - let join_handle = tokio::spawn(async move { + tokio::spawn(async move { let mut min_interval = tokio::time::interval(check_interval); loop { - tokio::select! { - _ = min_interval.tick() => {}, - _ = &mut shutdown_rx => { - tracing::info!("Idle checker is stopped"); - return; - } - } + min_interval.tick().await; if idle_manager.is_exceeding_max_idle() { break; } @@ -104,9 +97,9 @@ impl IdleManager { idle_manager.get_config_max_idle() ); tracing::warn!("Idle checker is shutting down the server"); - let _ = idle_send.send(()); - }); - (join_handle, shutdown_tx) + + shutdown.cancel(); + }) } } diff --git a/src/meta/src/rpc/election/dummy.rs b/src/meta/src/rpc/election/dummy.rs new file mode 100644 index 0000000000000..567958dd08600 --- /dev/null +++ b/src/meta/src/rpc/election/dummy.rs @@ -0,0 +1,73 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::watch::{self, Receiver, Sender}; + +use crate::{ElectionClient, ElectionMember, MetaResult}; + +/// A dummy implementation of [`ElectionClient`] for scenarios where only one meta node is running, +/// typically for testing purposes such as an in-memory meta store. +/// +/// This can be used to unify the code paths no matter there's HA or not. +pub struct DummyElectionClient { + id: String, + + /// A dummy watcher that never changes, indicating we are always the leader. + dummy_watcher: Sender, +} + +impl DummyElectionClient { + pub fn new(id: String) -> Self { + Self { + id, + dummy_watcher: watch::channel(true).0, + } + } + + fn self_member(&self) -> ElectionMember { + ElectionMember { + id: self.id.clone(), + is_leader: true, + } + } +} + +#[async_trait::async_trait] +impl ElectionClient for DummyElectionClient { + fn id(&self) -> MetaResult { + Ok(self.id.clone()) + } + + async fn run_once(&self, _ttl: i64, mut stop: Receiver<()>) -> MetaResult<()> { + // Only exit when the stop signal is received. + let _ = stop.changed().await; + Ok(()) + } + + fn subscribe(&self) -> Receiver { + self.dummy_watcher.subscribe() + } + + async fn leader(&self) -> MetaResult> { + Ok(Some(self.self_member())) + } + + async fn get_members(&self) -> MetaResult> { + Ok(vec![self.self_member()]) + } + + fn is_leader(&self) -> bool { + true + } +} diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index 96b16f537356e..6834591764360 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -36,7 +36,7 @@ pub struct EtcdElectionClient { #[async_trait::async_trait] impl ElectionClient for EtcdElectionClient { - async fn is_leader(&self) -> bool { + fn is_leader(&self) -> bool { *self.is_leader_sender.borrow() } @@ -404,7 +404,7 @@ mod tests { let leader = new_followers.into_iter().next().unwrap(); - assert!(leader.1.is_leader().await); + assert!(leader.1.is_leader()); } #[tokio::test] @@ -434,7 +434,7 @@ mod tests { let mut leaders = vec![]; let mut followers = vec![]; for (sender, client) in clients { - if client.is_leader().await { + if client.is_leader() { leaders.push((sender, client)); } else { followers.push((sender, client)); @@ -476,7 +476,7 @@ mod tests { } for client in &clients { - assert!(!client.1.is_leader().await); + assert!(!client.1.is_leader()); } for (stop_sender, client) in &clients { diff --git a/src/meta/src/rpc/election/mod.rs b/src/meta/src/rpc/election/mod.rs index 0c65d497b677e..9b34d19ce2244 100644 --- a/src/meta/src/rpc/election/mod.rs +++ b/src/meta/src/rpc/election/mod.rs @@ -11,6 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +pub mod dummy; pub mod etcd; pub mod sql; @@ -34,9 +36,12 @@ pub trait ElectionClient: Send + Sync + 'static { } fn id(&self) -> MetaResult; + /// Run the long-running election process. + /// + /// Returns when the leader status is lost, or the stop signal is received. async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; fn subscribe(&self) -> Receiver; async fn leader(&self) -> MetaResult>; async fn get_members(&self) -> MetaResult>; - async fn is_leader(&self) -> bool; + fn is_leader(&self) -> bool; } diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index 9ec5bd199cf76..62694aaa3ded3 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -781,7 +781,7 @@ where .collect()) } - async fn is_leader(&self) -> bool { + fn is_leader(&self) -> bool { *self.is_leader_sender.borrow() } } @@ -842,7 +842,7 @@ mod tests { loop { receiver.changed().await.unwrap(); if *receiver.borrow() { - assert!(sql_election_client.is_leader().await); + assert!(sql_election_client.is_leader()); break; } } @@ -874,7 +874,7 @@ mod tests { let mut is_leaders = vec![]; for client in clients { - is_leaders.push(client.is_leader().await); + is_leaders.push(client.is_leader()); } assert!(is_leaders.iter().filter(|&x| *x).count() <= 1); diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 28520720e98fe..618ac9c26436c 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -717,7 +717,7 @@ impl Default for MetaMetrics { pub fn start_worker_info_monitor( metadata_manager: MetadataManager, - election_client: Option, + election_client: ElectionClientRef, interval: Duration, meta_metrics: Arc, ) -> (JoinHandle<()>, Sender<()>) { @@ -754,9 +754,7 @@ pub fn start_worker_info_monitor( .with_label_values(&[(worker_type.as_str_name())]) .set(worker_num as i64); } - if let Some(client) = &election_client - && let Ok(meta_members) = client.get_members().await - { + if let Ok(meta_members) = election_client.get_members().await { meta_metrics .worker_num .with_label_values(&[WorkerType::Meta.as_str_name()]) diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 4c5648ab1dd49..bd348e65defca 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -15,7 +15,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use futures::pin_mut; use futures_async_stream::try_stream; use pin_project::pin_project; @@ -147,6 +147,7 @@ impl RemoteInput { metrics: Arc, batched_permits_limit: usize, ) { + let self_actor_id = up_down_ids.1; let client = client_pool.get_by_addr(upstream_addr).await?; let (stream, permits_tx) = client .get_stream(up_down_ids.0, up_down_ids.1, up_down_frag.0, up_down_frag.1) @@ -162,6 +163,7 @@ impl RemoteInput { let span: await_tree::Span = format!("RemoteInput (actor {up_actor_id})").into(); let mut batched_permits_accumulated = 0; + let mut mutation_subscriber = None; pin_mut!(stream); while let Some(data_res) = stream.next().verbose_instrument_await(span.clone()).await { @@ -203,10 +205,22 @@ impl RemoteInput { barrier.mutation.is_none(), "Mutation should be erased in remote side" ); - let mutation = local_barrier_manager - .read_barrier_mutation(barrier) + let mutation_subscriber = + mutation_subscriber.get_or_insert_with(|| { + local_barrier_manager + .subscribe_barrier_mutation(self_actor_id, barrier) + }); + + let mutation = mutation_subscriber + .recv() .await - .context("Read barrier mutation error")?; + .ok_or_else(|| { + anyhow!("failed to receive mutation of barrier {:?}", barrier) + }) + .map(|(prev_epoch, mutation)| { + assert_eq!(prev_epoch, barrier.epoch.prev); + mutation + })?; barrier.mutation = mutation; } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 1098f505004c4..d20759f017a2b 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -31,7 +31,7 @@ use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tonic::{Code, Status}; @@ -48,6 +48,7 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; @@ -187,6 +188,8 @@ impl CreateActorContext { } } +pub(super) type SubscribeMutationItem = (u64, Option>); + pub(super) enum LocalBarrierEvent { ReportActorCollected { actor_id: ActorId, @@ -197,9 +200,10 @@ pub(super) enum LocalBarrierEvent { actor: ActorId, state: BackfillState, }, - ReadBarrierMutation { - barrier: Barrier, - mutation_sender: oneshot::Sender>>, + SubscribeBarrierMutation { + actor_id: ActorId, + epoch: EpochPair, + mutation_sender: mpsc::UnboundedSender, }, #[cfg(test)] Flush(oneshot::Sender<()>), @@ -515,11 +519,13 @@ impl LocalBarrierWorker { } => { self.update_create_mview_progress(current_epoch, actor, state); } - LocalBarrierEvent::ReadBarrierMutation { - barrier, + LocalBarrierEvent::SubscribeBarrierMutation { + actor_id, + epoch, mutation_sender, } => { - self.read_barrier_mutation(barrier, mutation_sender); + self.state + .subscribe_actor_mutation(actor_id, epoch.prev, mutation_sender); } #[cfg(test)] LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), @@ -642,15 +648,6 @@ impl LocalBarrierWorker { Ok(()) } - /// Read mutation from barrier state. - fn read_barrier_mutation( - &mut self, - barrier: Barrier, - sender: oneshot::Sender>>, - ) { - self.state.read_barrier_mutation(&barrier, sender); - } - /// Register sender for source actors, used to send barriers. fn register_sender(&mut self, actor_id: ActorId, senders: Vec>) { tracing::debug!( @@ -907,17 +904,18 @@ impl LocalBarrierManager { } /// When a `RemoteInput` get a barrier, it should wait and read the barrier mutation from the barrier manager. - pub async fn read_barrier_mutation( + pub fn subscribe_barrier_mutation( &self, - barrier: &Barrier, - ) -> StreamResult>> { - let (tx, rx) = oneshot::channel(); - self.send_event(LocalBarrierEvent::ReadBarrierMutation { - barrier: barrier.clone(), + actor_id: ActorId, + first_barrier: &Barrier, + ) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + self.send_event(LocalBarrierEvent::SubscribeBarrierMutation { + actor_id, + epoch: first_barrier.epoch, mutation_sender: tx, }); - rx.await - .map_err(|_| anyhow!("barrier manager maybe reset").into()) + rx } } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 6c70525aa4b3a..40b47ee26e8f7 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::assert_matches::assert_matches; -use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::future::Future; @@ -34,10 +33,10 @@ use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgres use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; -use tokio::sync::oneshot; +use tokio::sync::mpsc; use super::progress::BackfillState; -use super::BarrierCompleteResult; +use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; @@ -69,13 +68,6 @@ impl Debug for IssuedState { /// The state machine of local barrier manager. #[derive(Debug)] enum ManagedBarrierStateInner { - /// Received barrier from actors in other compute nodes in remote input, however no `send_barrier` - /// request from the meta service is issued. - Stashed { - /// Senders registered by the remote input. - mutation_senders: Vec>>>, - }, - /// Meta service has issued a `send_barrier` request. We're collecting barriers now. Issued(IssuedState), @@ -133,9 +125,6 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { for (epoch, barrier_state) in self.epoch_barrier_state_map { write!(f, "> Epoch {}: ", epoch)?; match &barrier_state.inner { - ManagedBarrierStateInner::Stashed { .. } => { - write!(f, "Stashed")?; - } ManagedBarrierStateInner::Issued(state) => { write!(f, "Issued [{:?}]. Remaining actors: [", state.kind)?; let mut is_prev_epoch_issued = false; @@ -193,12 +182,26 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { } } +#[derive(Default)] +struct ActorMutationSubscribers { + pending_subscribers: BTreeMap>>, + started_subscribers: Vec>, +} + +impl ActorMutationSubscribers { + fn is_empty(&self) -> bool { + self.pending_subscribers.is_empty() && self.started_subscribers.is_empty() + } +} + pub(super) struct ManagedBarrierState { /// Record barrier state for each epoch of concurrent checkpoints. /// /// The key is `prev_epoch`, and the first value is `curr_epoch` epoch_barrier_state_map: BTreeMap, + mutation_subscribers: HashMap, + /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. pub(super) create_mview_progress: HashMap>, @@ -231,6 +234,7 @@ impl ManagedBarrierState { ) -> Self { Self { epoch_barrier_state_map: BTreeMap::default(), + mutation_subscribers: Default::default(), create_mview_progress: Default::default(), state_store, streaming_metrics, @@ -246,39 +250,69 @@ impl ManagedBarrierState { } } - pub fn read_barrier_mutation( + pub(super) fn subscribe_actor_mutation( &mut self, - barrier: &Barrier, - sender: oneshot::Sender>>, + actor_id: ActorId, + start_prev_epoch: u64, + tx: mpsc::UnboundedSender, ) { - match self.epoch_barrier_state_map.entry(barrier.epoch.prev) { - Entry::Vacant(v) => { - v.insert(BarrierState { - curr_epoch: barrier.epoch.curr, - inner: ManagedBarrierStateInner::Stashed { - mutation_senders: vec![sender], - }, - }); - } - Entry::Occupied(mut o) => { - let state = o.get_mut(); - match &mut state.inner { - ManagedBarrierStateInner::Stashed { - ref mut mutation_senders, - } => { - mutation_senders.push(sender); - } - ManagedBarrierStateInner::Issued(IssuedState { mutation, .. }) => { - let _ = sender.send(mutation.clone()); - } - _ => { - panic!( - "cannot read barrier mutation {:?} at current state: {:?}", - barrier.epoch, state.inner - ) + let subscribers = self.mutation_subscribers.entry(actor_id).or_default(); + if let Some(state) = self.epoch_barrier_state_map.get(&start_prev_epoch) { + match &state.inner { + ManagedBarrierStateInner::Issued(issued_state) => { + assert!(issued_state.remaining_actors.contains(&actor_id)); + for (prev_epoch, state) in + self.epoch_barrier_state_map.range(start_prev_epoch..) + { + match &state.inner { + ManagedBarrierStateInner::Issued(issued_state) => { + if issued_state.remaining_actors.contains(&actor_id) { + if tx + .send((*prev_epoch, issued_state.mutation.clone())) + .is_err() + { + // No more subscribe on the mutation. Simply return. + return; + } + } else { + // The barrier no more collect from such actor. End subscribe on mutation. + return; + } + } + state @ ManagedBarrierStateInner::AllCollected + | state @ ManagedBarrierStateInner::Completed(_) => { + unreachable!( + "should be Issued when having new subscriber, but current state: {:?}", + state + ) + } + } } + subscribers.started_subscribers.push(tx); + } + state @ ManagedBarrierStateInner::AllCollected + | state @ ManagedBarrierStateInner::Completed(_) => { + unreachable!( + "should be Issued when having new subscriber, but current state: {:?}", + state + ) } } + } else { + // Barrier has not issued yet. Store the pending tx + if let Some((last_epoch, _)) = self.epoch_barrier_state_map.last_key_value() { + assert!( + *last_epoch < start_prev_epoch, + "later barrier {} has been issued, but skip the start epoch {:?}", + last_epoch, + start_prev_epoch + ); + } + subscribers + .pending_subscribers + .entry(start_prev_epoch) + .or_default() + .push(tx); } } @@ -300,7 +334,7 @@ impl ManagedBarrierState { ManagedBarrierStateInner::AllCollected | ManagedBarrierStateInner::Completed(_) => { continue; } - ManagedBarrierStateInner::Stashed { .. } | ManagedBarrierStateInner::Issued(_) => { + ManagedBarrierStateInner::Issued(_) => { break; } } @@ -445,16 +479,12 @@ impl ManagedBarrierState { ); match self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev) { - Some(&mut BarrierState { - inner: ManagedBarrierStateInner::Stashed { .. }, - .. - }) - | None => { + None => { // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection. // Given these conditions, it's inconceivable for an actor to attempt collect at this point. panic!( - "cannot collect new actor barrier {:?} at current state: Stashed or None", + "cannot collect new actor barrier {:?} at current state: None", barrier.epoch, ) } @@ -497,26 +527,56 @@ impl ManagedBarrierState { .streaming_metrics .barrier_inflight_latency .start_timer(); - match self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev) { - Some(&mut BarrierState { - inner: - ManagedBarrierStateInner::Stashed { - ref mut mutation_senders, - }, - .. - }) => { - for sender in mutation_senders.drain(..) { - let _ = sender.send(barrier.mutation.clone()); - } - } - Some(BarrierState { ref inner, .. }) => { + if let Some(BarrierState { ref inner, .. }) = + self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev) + { + { panic!( "barrier epochs{:?} state has already been `Issued`. Current state: {:?}", barrier.epoch, inner ); } - None => {} }; + + for (actor_id, subscribers) in &mut self.mutation_subscribers { + if actor_ids_to_collect.contains(actor_id) { + if let Some((first_epoch, _)) = subscribers.pending_subscribers.first_key_value() { + assert!( + *first_epoch >= barrier.epoch.prev, + "barrier epoch {:?} skip subscribed epoch {}", + barrier.epoch, + first_epoch + ); + if *first_epoch == barrier.epoch.prev { + subscribers.started_subscribers.extend( + subscribers + .pending_subscribers + .pop_first() + .expect("should exist") + .1, + ); + } + } + subscribers.started_subscribers.retain(|tx| { + tx.send((barrier.epoch.prev, barrier.mutation.clone())) + .is_ok() + }); + } else { + subscribers.started_subscribers.clear(); + if let Some((first_epoch, _)) = subscribers.pending_subscribers.first_key_value() { + assert!( + *first_epoch > barrier.epoch.prev, + "barrier epoch {:?} skip subscribed epoch {}", + barrier.epoch, + first_epoch + ); + } + } + } + + self.mutation_subscribers + .retain(|_, subscribers| !subscribers.is_empty()); + self.epoch_barrier_state_map.insert( barrier.epoch.prev, BarrierState { diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 82a08e1d66117..60b3867a1a0c0 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -180,10 +180,11 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { // Prepare the barrier let curr_epoch = test_epoch(2); let barrier = Barrier::new_test_barrier(curr_epoch); - let epoch = barrier.epoch.prev; + + let mut mutation_subscriber = manager.subscribe_barrier_mutation(extra_actor_id, &barrier); // Read the mutation after receiving the barrier from remote input. - let mut mutation_reader = pin!(manager.read_barrier_mutation(&barrier)); + let mut mutation_reader = pin!(mutation_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); request_tx @@ -200,8 +201,8 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { })) .unwrap(); - let mutation = mutation_reader.await.unwrap(); - assert_eq!(mutation, barrier.mutation); + let (epoch, mutation) = mutation_reader.await.unwrap(); + assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); // Collect a barrier before sending manager.collect(extra_actor_id, &barrier); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 195715e130877..de5e747624098 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -154,7 +154,7 @@ pub async fn start_meta_node(listen_addr: String, state_store: String, config_pa "enable_compaction_deterministic should be set" ); - risingwave_meta_node::start(meta_opts).await + risingwave_meta_node::start(meta_opts, CancellationToken::new() /* dummy */).await } async fn start_compactor_node( diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index c733288757ba2..8e2ffece2fed9 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -455,7 +455,12 @@ impl Cluster { .create_node() .name(format!("meta-{i}")) .ip([192, 168, 1, i as u8].into()) - .init(move || risingwave_meta_node::start(opts.clone())) + .init(move || { + risingwave_meta_node::start( + opts.clone(), + CancellationToken::new(), // dummy + ) + }) .build(); }