From 2064b5b145cda3c0b04061a3e819bf68b32d0333 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 22 Nov 2024 17:28:10 +0800 Subject: [PATCH 1/7] fix: memory allocation of standalone mode (#19477) --- docker/README.md | 100 +++++++++--------- docker/docker-compose-with-azblob.yml | 13 ++- docker/docker-compose-with-gcs.yml | 13 ++- docker/docker-compose-with-local-fs.yml | 12 ++- docker/docker-compose-with-obs.yml | 13 ++- docker/docker-compose-with-oss.yml | 13 ++- docker/docker-compose-with-s3.yml | 13 ++- docker/docker-compose-with-sqlite.yml | 13 ++- docker/docker-compose.yml | 13 ++- src/cmd_all/src/standalone.rs | 135 +++++++----------------- src/compute/src/lib.rs | 7 ++ src/compute/src/memory/controller.rs | 6 +- src/compute/src/memory/manager.rs | 4 +- src/compute/src/server.rs | 18 ++-- src/frontend/src/lib.rs | 2 +- src/frontend/src/session.rs | 12 ++- 16 files changed, 181 insertions(+), 206 deletions(-) diff --git a/docker/README.md b/docker/README.md index 865f7c64c5183..13782a0447bb8 100644 --- a/docker/README.md +++ b/docker/README.md @@ -1,43 +1,21 @@ -# Docker Images +# RisingWave Docker Images -## Published images +## Run RisingWave with Docker Compose -- `latest` on GHCR (latest nightly build): `ghcr.io/risingwavelabs/risingwave:latest` -- `latest` on Docker Hub (latest release): `risingwavelabs/risingwave:latest` -- Other tags available on both GHCR and Docker Hub: - - `nightly-yyyyMMdd`, e.g., `nightly-20230108` - - `vX.Y.Z`, e.g., `v0.1.15` +Docker Compose allows you to easily launch a RisingWave instance on a single node. If you are using more than one server, please refer to [Deploy RisingWave on Kubernetes](https://docs.risingwave.com/deploy/risingwave-kubernetes). -## Build the images -The docker images for x86_64 are built with AVX2 SIMD extensions, while the images for aarch64 are built with NEON SIMD extensions. These must be available on your machine. If your machine does not support these extensions, you must build the docker image with the build-arg `simd_disabled=true`. - -To build the images, simply run: - -``` -docker build . -f docker/Dockerfile -``` - -from the project root. - -To build the images without SIMD vector extensions, run - -``` -docker build . -f docker/Dockerfile --build-arg simd_disabled=true -``` - -from the project root and run any subsequent docker commands on the resultant image. - -## Use the images - -To ensure you are using the latest version of RisingWave image, +Ensure you are using the latest version of RisingWave image: ``` # Ensure risingwave image is of latest version docker pull ghcr.io/risingwavelabs/risingwave:latest ``` -### playground +### Playground + +Playground mode does not persist any data. **Never** use it for production purpose. + To start a RisingWave playground, run ``` @@ -45,25 +23,27 @@ To start a RisingWave playground, run docker run -it --pull=always -p 4566:4566 -p 5691:5691 ghcr.io/risingwavelabs/risingwave:latest playground ``` -### standalone minio -To start a RisingWave standalone mode with minio backend, run +### Standalone (MinIO backend) + +To start a RisingWave standalone instance with MinIO backend, run ``` # Start all components docker-compose up ``` -### distributed cluster minio -To start a RisingWave cluster with minio backend, run +**⚠️ Important Notes: Memory is crucial for RisingWave!** Inappropriate memory configuration may lead to OOM (out-of-memory) errors or poor performance. Before deploying Docker Compose, ensure that the correct memory settings are configured in the `docker-compose.yaml` file. Here are examples of some typical settings. + +| Memory for RW container (`resource.limits.memory`) | 8 GiB | 14 GiB | 28 GiB | 58 GiB | +|----------------------------------------------------|---|---|---|---| +| `compute-opts.total-memory-bytes` | 6 GiB | 10 GiB | 20 GiB | 46 GiB | +| `frontend-opts.frontend-total-memory-bytes` | 1 GiB | 2 GiB | 4 GiB | 6 GiB | +| `compactor-opts.compactor-total-memory-bytes` | 1 GiB | 2 GiB | 4 GiB | 6 GiB | +| `compute-opts.memory-manager-target-bytes` | 5.6 GiB | 9.8 GiB | 20.8 GiB | 44.8 GiB | -``` -# Start all components -docker-compose -f docker-compose-distributed.yml up -``` -It will start a minio, a meta node, a compute node, a frontend, a compactor, a prometheus and a redpanda instance. +### Standalone (S3 backend) -### s3 and other s3-compatible storage backend To start a RisingWave cluster with s3 backend, configure the aws credit in [aws.env](https://github.com/risingwavelabs/risingwave/blob/main/docker/aws.env). If you want to use some s3 compatible storage like Tencent Cloud COS, just configure one more [endpoint](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/aws.env#L7). After configuring the environment and fill in your [bucket name](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-s3.yml#L196), run @@ -75,7 +55,8 @@ docker-compose -f docker-compose-with-s3.yml up It will run with s3 (compatible) object storage with a meta node, a compute node, a frontend, a compactor, a prometheus and a redpanda instance. -### Start with other storage products of public cloud vendors +### Standalone (other backend) + To start a RisingWave cluster with other storage backend, like Google Cloud Storage, Alicloud OSS or Azure Blob Storage, configure the authentication information in [multiple_object_storage.env](https://github.com/risingwavelabs/risingwave/blob/main/docker/multiple_object_storage.env), fill in your [bucket name](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-gcs.yml#L196). and run @@ -86,27 +67,39 @@ docker-compose -f docker-compose-with-xxx.yml up It will run RisingWave with corresponding (object) storage products. -### Start with HDFS backend -To start a RisingWave cluster with HDFS, mount your `HADDOP_HOME` in [compactor node volumes](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-hdfs.yml#L28), [compute node volumes](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-hdfs.yml#L112) [compute node volumes](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-hdfs.yml#L218), fill in the [cluster_name/namenode](https://github.com/risingwavelabs/risingwave/blob/a2684461e379ce73f8d730982147439e2379de16/docker/docker-compose-with-hdfs.yml#L202), -and run +> [!NOTE] +> +> For RisingWave kernel hackers, we always recommend using [risedev](../src/risedevtool/README.md) to start the full cluster, instead of using docker images. +> See [CONTRIBUTING](../CONTRIBUTING.md) for more information. + + +## Published images + +- `latest` on GHCR (latest nightly build): `ghcr.io/risingwavelabs/risingwave:latest` +- `latest` on Docker Hub (latest release): `risingwavelabs/risingwave:latest` +- Other tags available on both GHCR and Docker Hub: + - `nightly-yyyyMMdd`, e.g., `nightly-20230108` + - `vX.Y.Z`, e.g., `v0.1.15` + +## Build the images + +The docker images for x86_64 are built with AVX2 SIMD extensions, while the images for aarch64 are built with NEON SIMD extensions. These must be available on your machine. If your machine does not support these extensions, you must build the docker image with the build-arg `simd_disabled=true`. + +To build the images, simply run: ``` -# Start all components -docker-compose -f docker-compose-with-hdfs.yml up +docker build . -f docker/Dockerfile ``` -It will run RisingWave with HDFS. +from the project root. -To clean all data, run: +To build the images without SIMD vector extensions, run ``` -docker-compose down -v +docker build . -f docker/Dockerfile --build-arg simd_disabled=true ``` -> [!NOTE] -> -> For RisingWave kernel hackers, we always recommend using [risedev](../src/risedevtool/README.md) to start the full cluster, instead of using docker images. -> See [CONTRIBUTING](../CONTRIBUTING.md) for more information. +from the project root and run any subsequent docker commands on the resultant image. ## Generate docker-compose.yml @@ -122,4 +115,5 @@ Error { code: "XMinioStorageFull", message: "Storage backend has reached its min ``` Solution: + This usually happens on MacOS with Docker Desktop. The Docker Deskup runs in the macOS Hypervisor. All the data, including logs, images, volumes, and so on, is stored in this hypervisor and the hypervisor has a default disk capacity limit. So when this message emerges, simply cleaning up the unused container or image can help mitigate. You can also increase capacity limit by clicking the Docker Desktop icon in the menu bar, then clicking Preferences > Resources > `Increase Disk image size`. diff --git a/docker/docker-compose-with-azblob.yml b/docker/docker-compose-with-azblob.yml index 99889d846dcb1..d479f9464b5fe 100644 --- a/docker/docker-compose-with-azblob.yml +++ b/docker/docker-compose-with-azblob.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-gcs.yml b/docker/docker-compose-with-gcs.yml index 80466c7cccaba..6380b91a01a95 100644 --- a/docker/docker-compose-with-gcs.yml +++ b/docker/docker-compose-with-gcs.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-local-fs.yml b/docker/docker-compose-with-local-fs.yml index 68483796ac800..abfea87b5a68e 100644 --- a/docker/docker-compose-with-local-fs.yml +++ b/docker/docker-compose-with-local-fs.yml @@ -21,21 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - # --parallelism 4 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-obs.yml b/docker/docker-compose-with-obs.yml index f4bf8dc0e74c0..690da92d8b966 100644 --- a/docker/docker-compose-with-obs.yml +++ b/docker/docker-compose-with-obs.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-oss.yml b/docker/docker-compose-with-oss.yml index 7d9563473182a..dab8af1592993 100644 --- a/docker/docker-compose-with-oss.yml +++ b/docker/docker-compose-with-oss.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-s3.yml b/docker/docker-compose-with-s3.yml index d7dc75aa556a6..c9d839220c943 100644 --- a/docker/docker-compose-with-s3.yml +++ b/docker/docker-compose-with-s3.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose-with-sqlite.yml b/docker/docker-compose-with-sqlite.yml index d4081b592c2ab..98d88a415d496 100644 --- a/docker/docker-compose-with-sqlite.yml +++ b/docker/docker-compose-with-sqlite.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e315878c98b77..b7b29313547b7 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -21,22 +21,25 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - #--parallelism 4 \ - #--total-memory-bytes 8589934592 \ + --parallelism 8 \ + --total-memory-bytes 21474836480 \ --role both \ - --meta-address http://0.0.0.0:5690\" \ + --meta-address http://0.0.0.0:5690 \ + --memory-manager-target-bytes 22333829939 \" \ --frontend-opts=\" \ --config-path /risingwave.toml \ --listen-addr 0.0.0.0:4566 \ --advertise-addr 0.0.0.0:4566 \ --prometheus-listener-addr 0.0.0.0:1250 \ --health-check-listener-addr 0.0.0.0:6786 \ - --meta-addr http://0.0.0.0:5690\" \ + --meta-addr http://0.0.0.0:5690 \ + --frontend-total-memory-bytes=4294967296\" \ --compactor-opts=\" \ --listen-addr 0.0.0.0:6660 \ --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:6660 \ - --meta-address http://0.0.0.0:5690\"" + --meta-address http://0.0.0.0:5690 \ + --compactor-total-memory-bytes=4294967296\"" expose: - "6660" - "4566" diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 6eb62492999a0..b5730d8d7845a 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -391,17 +391,9 @@ It SHOULD NEVER be used in benchmarks and production environment!!!" #[cfg(test)] mod test { - use std::fmt::Debug; - - use expect_test::{expect, Expect}; - use super::*; - fn check(actual: impl Debug, expect: Expect) { - let actual = format!("{:#?}", actual); - expect.assert_eq(&actual); - } - + #[allow(clippy::assertions_on_constants)] #[test] fn test_parse_opt_args() { // Test parsing into standalone-level opts. @@ -426,93 +418,42 @@ mod test { // Test parsing into node-level opts. let actual = parse_standalone_opt_args(&opts); - check( - actual, - expect![[r#" - ParsedStandaloneOpts { - meta_opts: Some( - MetaNodeOpts { - listen_addr: "127.0.0.1:8001", - advertise_addr: "127.0.0.1:9999", - dashboard_host: None, - prometheus_listener_addr: Some( - "127.0.0.1:1234", - ), - sql_endpoint: None, - sql_username: "", - sql_password: [REDACTED alloc::string::String], - sql_database: "", - prometheus_endpoint: None, - prometheus_selector: None, - privatelink_endpoint_default_tags: None, - vpc_id: None, - security_group_id: None, - config_path: "src/config/test.toml", - backend: None, - barrier_interval_ms: None, - sstable_size_mb: None, - block_size_kb: None, - bloom_false_positive: None, - state_store: None, - data_directory: Some( - "some path with spaces", - ), - do_not_config_object_storage_lifecycle: None, - backup_storage_url: None, - backup_storage_directory: None, - heap_profiling_dir: None, - dangerous_max_idle_secs: None, - connector_rpc_endpoint: None, - license_key: None, - license_key_path: None, - temp_secret_file_dir: "./meta/secrets/", - }, - ), - compute_opts: Some( - ComputeNodeOpts { - listen_addr: "127.0.0.1:8000", - advertise_addr: None, - prometheus_listener_addr: "127.0.0.1:1234", - meta_address: List( - [ - http://127.0.0.1:5690/, - ], - ), - config_path: "src/config/test.toml", - total_memory_bytes: 34359738368, - reserved_memory_bytes: None, - parallelism: 10, - role: Both, - metrics_level: None, - data_file_cache_dir: None, - meta_file_cache_dir: None, - async_stack_trace: None, - heap_profiling_dir: None, - connector_rpc_endpoint: None, - temp_secret_file_dir: "./compute/secrets/", - }, - ), - frontend_opts: Some( - FrontendOpts { - listen_addr: "0.0.0.0:4566", - tcp_keepalive_idle_secs: 300, - advertise_addr: None, - meta_addr: List( - [ - http://127.0.0.1:5690/, - ], - ), - prometheus_listener_addr: "127.0.0.1:1234", - frontend_rpc_listener_addr: "127.0.0.1:6786", - config_path: "src/config/test.toml", - metrics_level: None, - enable_barrier_read: None, - temp_secret_file_dir: "./frontend/secrets/", - frontend_total_memory_bytes: 34359738368, - }, - ), - compactor_opts: None, - }"#]], - ); + if let Some(compute_opts) = &actual.compute_opts { + assert_eq!(compute_opts.listen_addr, "127.0.0.1:8000"); + assert_eq!(compute_opts.total_memory_bytes, 34359738368); + assert_eq!(compute_opts.parallelism, 10); + assert_eq!(compute_opts.temp_secret_file_dir, "./compute/secrets/"); + assert_eq!(compute_opts.prometheus_listener_addr, "127.0.0.1:1234"); + assert_eq!(compute_opts.config_path, "src/config/test.toml"); + } else { + assert!(false); + } + if let Some(meta_opts) = &actual.meta_opts { + assert_eq!(meta_opts.listen_addr, "127.0.0.1:8001"); + assert_eq!(meta_opts.advertise_addr, "127.0.0.1:9999"); + assert_eq!( + meta_opts.data_directory, + Some("some path with spaces".to_string()) + ); + assert_eq!(meta_opts.temp_secret_file_dir, "./meta/secrets/"); + assert_eq!( + meta_opts.prometheus_listener_addr, + Some("127.0.0.1:1234".to_string()) + ); + assert_eq!(meta_opts.config_path, "src/config/test.toml"); + } else { + assert!(false); + } + + if let Some(frontend_opts) = &actual.frontend_opts { + assert_eq!(frontend_opts.config_path, "src/config/test.toml"); + assert_eq!(frontend_opts.temp_secret_file_dir, "./frontend/secrets/"); + assert_eq!(frontend_opts.frontend_total_memory_bytes, 34359738368); + assert_eq!(frontend_opts.prometheus_listener_addr, "127.0.0.1:1234"); + } else { + assert!(false); + } + + assert!(actual.compactor_opts.is_none()); } } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 1336a84980cea..ef4b5c5e32d3b 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -92,6 +92,13 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")] pub reserved_memory_bytes: Option, + /// Target memory usage for Memory Manager. + /// If not set, the default value is `total_memory_bytes` - `reserved_memory_bytes` + /// + /// It's strongly recommended to set it for standalone deployment. + #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")] + pub memory_manager_target_bytes: Option, + /// The parallelism that the compute node will register to the scheduler of the meta service. #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())] #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)] diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs index 2857b0e6b0d66..878471a7bedb2 100644 --- a/src/compute/src/memory/controller.rs +++ b/src/compute/src/memory/controller.rs @@ -85,10 +85,10 @@ pub struct LruWatermarkController { impl LruWatermarkController { pub fn new(config: &MemoryManagerConfig) -> Self { - let threshold_stable = (config.total_memory as f64 * config.threshold_stable) as usize; - let threshold_graceful = (config.total_memory as f64 * config.threshold_graceful) as usize; + let threshold_stable = (config.target_memory as f64 * config.threshold_stable) as usize; + let threshold_graceful = (config.target_memory as f64 * config.threshold_graceful) as usize; let threshold_aggressive = - (config.total_memory as f64 * config.threshold_aggressive) as usize; + (config.target_memory as f64 * config.threshold_aggressive) as usize; Self { metrics: config.metrics.clone(), diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs index 235ab5802fbfd..5384838eec378 100644 --- a/src/compute/src/memory/manager.rs +++ b/src/compute/src/memory/manager.rs @@ -22,7 +22,9 @@ use risingwave_stream::executor::monitor::StreamingMetrics; use super::controller::LruWatermarkController; pub struct MemoryManagerConfig { - pub total_memory: usize, + /// [`MemoryManager`] will try to control the jemalloc-reported memory usage + /// to be lower than this + pub target_memory: usize, pub threshold_aggressive: f64, pub threshold_graceful: f64, diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index eda0ee9c159da..d86a516771802 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -288,15 +288,14 @@ pub async fn compute_node_serve( batch_mem_limit(compute_memory_bytes, opts.role.for_serving()), )); - // NOTE: Due to some limits, we use `compute_memory_bytes + storage_memory_bytes` as - // `total_compute_memory_bytes` for memory control. This is just a workaround for some - // memory control issues and should be modified as soon as we figure out a better solution. - // - // Related issues: - // - https://github.com/risingwavelabs/risingwave/issues/8696 - // - https://github.com/risingwavelabs/risingwave/issues/8822 + let target_memory = if let Some(v) = opts.memory_manager_target_bytes { + v + } else { + compute_memory_bytes + storage_memory_bytes + }; + let memory_mgr = MemoryManager::new(MemoryManagerConfig { - total_memory: compute_memory_bytes + storage_memory_bytes, + target_memory, threshold_aggressive: config .streaming .developer @@ -546,8 +545,7 @@ fn print_memory_config( reserved_memory_bytes: usize, ) { let memory_config = format!( - "\n\ - Memory outline:\n\ + "Memory outline:\n\ > total_memory: {}\n\ > storage_memory: {}\n\ > block_cache_capacity: {}\n\ diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index bf03edb6a87dd..6424da42a1510 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -162,7 +162,7 @@ pub struct FrontendOpts { )] pub temp_secret_file_dir: String, - /// Total available memory for the frontend node in bytes. Used by both computing and storage. + /// Total available memory for the frontend node in bytes. Used for batch computing. #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())] pub frontend_total_memory_bytes: usize, } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index fa4836e73e958..a920dc32653f7 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -62,6 +62,7 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::cluster_limit; use risingwave_common::util::cluster_limit::ActorCountPerParallelism; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::pretty_bytes::convert; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; @@ -450,9 +451,16 @@ impl FrontendEnv { // Run a background heap profiler heap_profiler.start(); - let mem_context = risingwave_common::memory::MemoryContext::root( + let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION; + let mem_context = MemoryContext::root( frontend_metrics.batch_total_mem.clone(), - (total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION) as u64, + batch_memory_limit as u64, + ); + + info!( + "Frontend total_memory: {} batch_memory: {}", + convert(total_memory_bytes as _), + convert(batch_memory_limit as _), ); Ok(( From fd5d0912fdebd031873651041866df6a2b8651b6 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:37:42 +0800 Subject: [PATCH 2/7] fix: when target is table, the sink downstream pk is not set (#19515) --- e2e_test/sink/sink_into_table/parallelism.slt | 6 +- src/frontend/planner_test/src/lib.rs | 1 + .../tests/testdata/input/sink_into_table.yaml | 12 ++ .../testdata/output/emit_on_window_close.yaml | 2 +- .../tests/testdata/output/sink.yaml | 18 +- .../testdata/output/sink_into_table.yaml | 17 ++ src/frontend/src/handler/create_sink.rs | 23 +- src/frontend/src/optimizer/mod.rs | 47 +++- .../src/optimizer/plan_node/stream_sink.rs | 204 +++++++++--------- 9 files changed, 193 insertions(+), 137 deletions(-) diff --git a/e2e_test/sink/sink_into_table/parallelism.slt b/e2e_test/sink/sink_into_table/parallelism.slt index 8359d2731d196..d5bbecf10f01a 100644 --- a/e2e_test/sink/sink_into_table/parallelism.slt +++ b/e2e_test/sink/sink_into_table/parallelism.slt @@ -8,16 +8,16 @@ statement ok SET STREAMING_PARALLELISM TO 2; statement ok -create table t_simple (v1 int, v2 int); +create table t_simple (v1 int, v2 int) append only; statement ok -create table m_simple (v1 int primary key, v2 int); +create table m_simple (v1 int, v2 int) append only; statement ok SET STREAMING_PARALLELISM TO 3; statement ok -create sink s_simple_1 into m_simple as select v1, v2 from t_simple; +create sink s_simple_1 into m_simple as select v1, v2 from t_simple with (type = 'append-only'); query I select distinct parallelism from rw_fragment_parallelism where name in ('t_simple', 'm_simple', 's_simple_1'); diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 528fa88ef3506..0efa5f66865e8 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -850,6 +850,7 @@ impl TestCase { false, None, None, + false, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml b/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml index 1191cd6a68966..24b3df8902faf 100644 --- a/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml +++ b/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml @@ -9,3 +9,15 @@ explain create sink ss into t from s with (type = 'append-only'); expected_outputs: - explain_output +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 from t2; + expected_outputs: + - explain_output +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 as select b from t2; + expected_outputs: + - explain_output diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index faaae7b4c9895..22d4fbf9134ad 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -206,7 +206,7 @@ emit on window close WITH (connector = 'blackhole'); explain_output: | - StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] } + StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], downstream_pk: [] } └─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] } └─StreamEowcSort { sort_column: t.tm } └─StreamExchange { dist: HashShard(t.bar) } diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index 12ba038043062..db5d533223cb4 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -9,7 +9,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: create_upsert_jdbc_sink_with_downstream_pk2 @@ -22,7 +22,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] } └─StreamExchange { dist: HashShard(t1.v3, t1.v5) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_upsert_jdbc_sink_with_downstream_pk1 @@ -36,7 +36,7 @@ type='upsert'); explain_output: |+ Fragment 0 - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } ├── tables: [ Sink: 0 ] ├── output: [ t1.v1, t1.v2, t1.v3, t1.v5, t1.v4 ] ├── stream key: [ t1.v3, t1.v4 ] @@ -89,7 +89,7 @@ type='upsert'); explain_output: |+ Fragment 0 - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] } ├── tables: [ Sink: 0 ] ├── output: [ t1.v1, t1.v2, t1.v3, t1.v5 ] ├── stream key: [ t1.v1, t1.v2 ] @@ -152,7 +152,7 @@ primary_key='v1,v2' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: downstream_pk_same_with_upstream @@ -165,7 +165,7 @@ primary_key='v2,v1' ); explain_output: | - StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v2, v1, count], downstream_pk: [t1.v2, t1.v1] } └─StreamProject { exprs: [t1.v2, t1.v1, count] } └─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } @@ -175,7 +175,7 @@ create table t2 (a int, b int, watermark for b as b - 4) append only; explain create sink sk1 from t2 emit on window close with (connector='blackhole'); explain_output: | - StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } + StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], downstream_pk: [] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } - id: create_mock_iceberg_sink_append_only_with_sparse_partition @@ -238,7 +238,7 @@ primary_key = 'v1' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] } └─StreamExchange { dist: HashShard($expr1) } └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } @@ -260,5 +260,5 @@ primary_key = 'v1' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml index 1fc6df6613a98..ac9485bc61abb 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml @@ -12,3 +12,20 @@ StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] } └─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] } └─StreamTableScan { table: s, columns: [x, _row_id] } +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 from t2; + explain_output: | + StreamProject { exprs: [t2.a, t2.b] } + └─StreamSink { type: upsert, columns: [a, b], downstream_pk: [t2.a] } + └─StreamExchange { dist: HashShard(t2.a) } + └─StreamTableScan { table: t2, columns: [a, b] } +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 as select b from t2; + explain_output: | + StreamProject { exprs: [t2.b, null:Int32] } + └─StreamSink { type: upsert, columns: [b], downstream_pk: [t2.b] } + └─StreamTableScan { table: t2, columns: [b] } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 7172e2ba7220b..cfcddc8b8bb4a 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -26,7 +26,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{ColumnCatalog, DatabaseId, ObjectId, Schema, SchemaId, UserId}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; -use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{ @@ -229,8 +229,6 @@ pub async fn gen_sink_plan( } } - let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id()); - let sink_plan = plan_root.gen_sink_plan( sink_table_name, definition, @@ -240,8 +238,9 @@ pub async fn gen_sink_plan( sink_from_table_name, format_desc, without_backfill, - target_table, + target_table_catalog.clone(), partition_info, + user_specified_columns, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -282,22 +281,6 @@ pub async fn gen_sink_plan( } } - let user_defined_primary_key_table = table_catalog.row_id_index.is_none(); - let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly - || sink_catalog.sink_type == SinkType::ForceAppendOnly; - - if !user_defined_primary_key_table && !sink_is_append_only { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), - ))); - } - - if table_catalog.append_only && !sink_is_append_only { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), - ))); - } - let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp(); let exprs = derive_default_column_project_for_sink( &sink_catalog, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 323e38f990321..9572a46d90959 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::num::NonZeroU32; use std::ops::DerefMut; +use std::sync::Arc; pub mod plan_node; @@ -41,7 +42,7 @@ mod plan_expr_visitor; mod rule; use std::assert_matches::assert_matches; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools as _; @@ -51,7 +52,7 @@ use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::bail; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, + ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, }; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -83,6 +84,7 @@ use crate::optimizer::plan_node::{ use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator}; use crate::optimizer::property::Distribution; use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved}; +use crate::TableCatalog; /// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`. /// and required distribution and order. And `PlanRoot` can generate corresponding streaming or @@ -958,8 +960,9 @@ impl PlanRoot { sink_from_table_name: String, format_desc: Option, without_backfill: bool, - target_table: Option, + target_table: Option>, partition_info: Option, + user_specified_columns: bool, ) -> Result { let stream_scan_type = if without_backfill { StreamScanType::UpstreamOnly @@ -977,12 +980,17 @@ impl PlanRoot { self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?; assert_eq!(self.phase, PlanPhase::Stream); assert_eq!(stream_plan.convention(), Convention::Stream); + let target_columns_to_plan_mapping = target_table.as_ref().map(|t| { + let columns = t.columns_without_rw_timestamp(); + self.target_columns_to_plan_mapping(&columns, user_specified_columns) + }); StreamSink::create( stream_plan, sink_name, db_name, sink_from_table_name, target_table, + target_columns_to_plan_mapping, self.required_dist.clone(), self.required_order.clone(), self.out_fields.clone(), @@ -1012,6 +1020,39 @@ impl PlanRoot { .config() .streaming_use_snapshot_backfill() } + + /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema + pub fn target_columns_to_plan_mapping( + &self, + tar_cols: &[ColumnCatalog], + user_specified_columns: bool, + ) -> Vec> { + #[allow(clippy::disallowed_methods)] + let visible_cols: Vec<(usize, String)> = self + .out_fields + .ones() + .zip_eq(self.out_names.iter().cloned()) + .collect_vec(); + + let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec(); + let visible_col_idxes_by_name = visible_cols + .iter() + .map(|(i, name)| (name.as_ref(), *i)) + .collect::>(); + + tar_cols + .iter() + .enumerate() + .filter(|(_, tar_col)| tar_col.can_dml()) + .map(|(tar_i, tar_col)| { + if user_specified_columns { + visible_col_idxes_by_name.get(tar_col.name()).cloned() + } else { + (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0) + } + }) + .collect() + } } fn find_version_column_index( diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 48dc4dad85c5c..74ce8587ef6d4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -14,13 +14,14 @@ use std::assert_matches::assert_matches; use std::io::{Error, ErrorKind}; +use std::sync::Arc; use anyhow::anyhow; use fixedbitset::FixedBitSet; use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, CreateType, TableId}; +use risingwave_common::catalog::{ColumnCatalog, CreateType}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::match_sink_name_str; @@ -43,7 +44,7 @@ use super::utils::{ childless_record, infer_kv_log_store_table_catalog_inner, Distill, IndicesDisplay, }; use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; -use crate::error::{ErrorCode, Result}; +use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; @@ -195,85 +196,6 @@ impl StreamSink { &self.sink_desc } - #[allow(clippy::too_many_arguments)] - pub fn create( - input: PlanRef, - name: String, - db_name: String, - sink_from_table_name: String, - target_table: Option, - user_distributed_by: RequiredDist, - user_order_by: Order, - user_cols: FixedBitSet, - out_names: Vec, - definition: String, - properties: WithOptionsSecResolved, - format_desc: Option, - partition_info: Option, - ) -> Result { - let columns = derive_columns(input.schema(), out_names, &user_cols)?; - let (input, mut sink) = Self::derive_sink_desc( - input, - user_distributed_by, - name, - db_name, - sink_from_table_name, - target_table, - user_order_by, - columns, - definition, - properties, - format_desc, - partition_info, - )?; - - let unsupported_sink = - |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); - - // check and ensure that the sink connector is specified and supported - let sink_decouple = match sink.properties.get(CONNECTOR_TYPE_KEY) { - Some(connector) => { - match_sink_name_str!( - connector.to_lowercase().as_str(), - SinkType, - { - // the table sink is created by with properties - if connector == TABLE_SINK && sink.target_table.is_none() { - unsupported_sink(TABLE_SINK) - } else { - SinkType::set_default_commit_checkpoint_interval( - &mut sink, - &input.ctx().session_ctx().config().sink_decouple(), - )?; - SinkType::is_sink_decouple( - &input.ctx().session_ctx().config().sink_decouple(), - ) - } - }, - |other: &str| unsupported_sink(other) - )? - } - None => { - return Err( - SinkError::Config(anyhow!("connector not specified when create sink")).into(), - ); - } - }; - // For file sink, it must have sink_decouple turned on. - if !sink_decouple && sink.is_file_sink() { - return Err( - SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(), - ); - } - let log_store_type = if sink_decouple { - SinkLogStoreType::KvLogStore - } else { - SinkLogStoreType::InMemoryLogStore - }; - - Ok(Self::new(input, sink, log_store_type)) - } - fn derive_iceberg_sink_distribution( input: PlanRef, partition_info: Option, @@ -308,27 +230,67 @@ impl StreamSink { } #[allow(clippy::too_many_arguments)] - fn derive_sink_desc( + pub fn create( mut input: PlanRef, - user_distributed_by: RequiredDist, name: String, db_name: String, - sink_from_name: String, - target_table: Option, + sink_from_table_name: String, + target_table: Option>, + target_table_mapping: Option>>, + user_distributed_by: RequiredDist, user_order_by: Order, - columns: Vec, + user_cols: FixedBitSet, + out_names: Vec, definition: String, properties: WithOptionsSecResolved, format_desc: Option, partition_info: Option, - ) -> Result<(PlanRef, SinkDesc)> { + ) -> Result { let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; + + let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let mut downstream_pk = - Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut downstream_pk = { + let from_properties = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + if let Some(t) = &target_table { + let user_defined_primary_key_table = t.row_id_index.is_none(); + let sink_is_append_only = + sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly; + + if !user_defined_primary_key_table && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if t.append_only && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if sink_type != SinkType::Upsert { + vec![] + } else { + let target_table_mapping = target_table_mapping.unwrap(); + t.pk() + .iter() + .map(|c| { + target_table_mapping[c.column_index].ok_or( + ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput, + "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_string() + ))).into())}) + .try_collect::<_, _, RwError>()? + } + } else { + from_properties + } + }; let mut extra_partition_col_idx = None; + let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { @@ -392,11 +354,11 @@ impl StreamSink { CreateType::Foreground }; let (properties, secret_refs) = properties.into_parts(); - let sink_desc = SinkDesc { + let mut sink_desc = SinkDesc { id: SinkId::placeholder(), name, db_name, - sink_from_name, + sink_from_name: sink_from_table_name, definition, columns, plan_pk: pk, @@ -406,11 +368,56 @@ impl StreamSink { secret_refs, sink_type, format_desc, - target_table, + target_table: target_table.as_ref().map(|catalog| catalog.id()), extra_partition_col_idx, create_type, }; - Ok((input, sink_desc)) + + let unsupported_sink = + |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); + + // check and ensure that the sink connector is specified and supported + let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) { + Some(connector) => { + match_sink_name_str!( + connector.to_lowercase().as_str(), + SinkType, + { + // the table sink is created by with properties + if connector == TABLE_SINK && sink_desc.target_table.is_none() { + unsupported_sink(TABLE_SINK) + } else { + SinkType::set_default_commit_checkpoint_interval( + &mut sink_desc, + &input.ctx().session_ctx().config().sink_decouple(), + )?; + SinkType::is_sink_decouple( + &input.ctx().session_ctx().config().sink_decouple(), + ) + } + }, + |other: &str| unsupported_sink(other) + )? + } + None => { + return Err( + SinkError::Config(anyhow!("connector not specified when create sink")).into(), + ); + } + }; + // For file sink, it must have sink_decouple turned on. + if !sink_decouple && sink_desc.is_file_sink() { + return Err( + SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(), + ); + } + let log_store_type = if sink_decouple { + SinkLogStoreType::KvLogStore + } else { + SinkLogStoreType::InMemoryLogStore + }; + + Ok(Self::new(input, sink_desc, log_store_type)) } fn is_user_defined_append_only(properties: &WithOptionsSecResolved) -> Result { @@ -572,16 +579,11 @@ impl Distill for StreamSink { vec.push(("type", Pretty::from(sink_type))); vec.push(("columns", column_names)); if self.sink_desc.sink_type.is_upsert() { - let pk = IndicesDisplay { - indices: &self - .sink_desc - .plan_pk - .iter() - .map(|k| k.column_index) - .collect_vec(), + let sink_pk = IndicesDisplay { + indices: &self.sink_desc.downstream_pk.clone(), schema: self.base.schema(), }; - vec.push(("pk", pk.distill())); + vec.push(("downstream_pk", sink_pk.distill())); } childless_record("StreamSink", vec) } From bc06ffdb782ea5a9f11d19825f0f61c1723991c1 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 22 Nov 2024 18:44:33 +0800 Subject: [PATCH 3/7] feat(compaction): Limit the size of the new overlapping level (#19277) --- proto/hummock.proto | 5 + src/common/src/config.rs | 4 + .../src/cmd_impl/hummock/compaction_group.rs | 4 + src/ctl/src/lib.rs | 5 + .../hummock/compaction/compaction_config.rs | 1 + src/meta/src/hummock/manager/commit_epoch.rs | 107 +++++++++++++-- .../compaction/compaction_group_manager.rs | 3 + src/meta/src/hummock/manager/transaction.rs | 54 ++++---- .../hummock_test/src/hummock_storage_tests.rs | 127 ++++++++++++++++++ .../src/monitor/monitored_storage_metrics.rs | 4 +- 10 files changed, 273 insertions(+), 41 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 869c5af867d43..15f3d61a7cf2b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint64 sst_allowed_trivial_move_min_size = 19; uint32 split_weight_by_vnode = 20; bool disable_auto_group_scheduling = 21; + uint64 max_overlapping_level_size = 22; } } repeated uint64 compaction_group_ids = 1; @@ -858,6 +859,10 @@ message CompactionConfig { // The limitation of auto group scheduling optional bool disable_auto_group_scheduling = 23; + + // The limitation of the max size of the overlapping-level for the compaction + // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size` + optional uint64 max_overlapping_level_size = 24; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 1f67057801c4f..393a3a05acb4d 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2237,6 +2237,10 @@ pub mod default { pub fn disable_auto_group_scheduling() -> bool { false } + + pub fn max_overlapping_level_size() -> u64 { + 256 * MB + } } pub mod object_store_config { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index d109d6eabda67..e164c0b060eb0 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -68,6 +68,7 @@ pub fn build_compaction_config_vec( max_l0_compact_level: Option, sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, + max_overlapping_level_size: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -127,6 +128,9 @@ pub fn build_compaction_config_vec( if let Some(c) = disable_auto_group_scheduling { configs.push(MutableConfig::DisableAutoGroupScheduling(c)) } + if let Some(c) = max_overlapping_level_size { + configs.push(MutableConfig::MaxOverlappingLevelSize(c)) + } configs } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 99de4cd9b17b9..c13e83cb62b00 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -85,6 +85,7 @@ enum ComputeCommands { ShowConfig { host: String }, } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand)] enum HummockCommands { /// list latest Hummock version on meta node @@ -191,6 +192,8 @@ enum HummockCommands { sst_allowed_trivial_move_min_size: Option, #[clap(long)] disable_auto_group_scheduling: Option, + #[clap(long)] + max_overlapping_level_size: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -578,6 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -610,6 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index d7be9b6e6cbaa..c808c2f548023 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -71,6 +71,7 @@ impl CompactionConfigBuilder { disable_auto_group_scheduling: Some( compaction_config::disable_auto_group_scheduling(), ), + max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()), }, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index c51c77a5d36a0..67152cba14236 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -15,7 +15,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::default::compaction_config; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids; @@ -112,7 +114,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); - let mut new_compaction_groups = HashMap::new(); + let mut new_compaction_groups = Vec::new(); let mut compaction_group_manager_txn = None; let mut compaction_group_config: Option> = None; @@ -143,14 +145,13 @@ impl HummockManager { ) }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); - compaction_group_manager.insert( - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: compaction_group_config, - }, - ); + let new_compaction_group = CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }; + + new_compaction_groups.push(new_compaction_group.clone()); + compaction_group_manager.insert(new_compaction_group_id, new_compaction_group); on_handle_add_new_table( state_table_info, @@ -165,12 +166,35 @@ impl HummockManager { .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; - let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); + let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec(); + // fill compaction_groups + let mut group_id_to_config = HashMap::new(); + if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { + for cg_id in &modified_compaction_groups { + let compaction_group = compaction_group_manager + .get(cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); + } + } else { + let compaction_group_manager = self.compaction_group_manager.read().await; + for cg_id in &modified_compaction_groups { + let compaction_group = compaction_group_manager + .try_get_compaction_group_config(*cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); + } + } + + let group_id_to_sub_levels = + rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config); let time_travel_delta = version.pre_commit_epoch( &tables_to_commit, new_compaction_groups, - commit_sstables, + group_id_to_sub_levels, &new_table_ids, new_table_watermarks, change_log_delta, @@ -327,6 +351,7 @@ impl HummockManager { ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); + let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec(); for commit_sst in sstables { let mut group_table_ids: BTreeMap> = BTreeMap::new(); for table_id in &commit_sst.sst_info.table_ids { @@ -395,6 +420,12 @@ impl HummockManager { } } + // order check + for ssts in commit_sstables.values() { + let object_ids = ssts.iter().map(|s| s.object_id).collect_vec(); + assert!(is_ordered_subset(&commit_object_id_vec, &object_ids)); + } + Ok(commit_sstables) } } @@ -419,3 +450,57 @@ fn on_handle_add_new_table( Ok(()) } + +/// Rewrite the commit sstables to sub-levels based on the compaction group config. +/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead. +fn rewrite_commit_sstables_to_sub_level( + commit_sstables: BTreeMap>, + group_id_to_config: &HashMap>, +) -> BTreeMap>> { + let mut overlapping_sstables: BTreeMap>> = BTreeMap::new(); + for (group_id, inserted_table_infos) in commit_sstables { + let config = group_id_to_config + .get(&group_id) + .expect("compaction group should exist"); + + let mut accumulated_size = 0; + let mut ssts = vec![]; + let sub_level_size_limit = config + .max_overlapping_level_size + .unwrap_or(compaction_config::max_overlapping_level_size()); + + let level = overlapping_sstables.entry(group_id).or_default(); + + for sst in inserted_table_infos { + accumulated_size += sst.sst_size; + ssts.push(sst); + if accumulated_size > sub_level_size_limit { + level.push(ssts); + + // reset the accumulated size and ssts + accumulated_size = 0; + ssts = vec![]; + } + } + + if !ssts.is_empty() { + level.push(ssts); + } + + // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top. + level.reverse(); + } + + overlapping_sstables +} + +fn is_ordered_subset(vec_1: &Vec, vec_2: &Vec) -> bool { + let mut vec_2_iter = vec_2.iter().peekable(); + for item in vec_1 { + if vec_2_iter.peek() == Some(&item) { + vec_2_iter.next(); + } + } + + vec_2_iter.peek().is_none() +} diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 18bb8dfaf86b3..3a6c179c03147 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::DisableAutoGroupScheduling(c) => { target.disable_auto_group_scheduling = Some(*c); } + MutableConfig::MaxOverlappingLevelSize(c) => { + target.max_overlapping_level_size = Some(*c); + } } } } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 054ae657d594d..8a4276492365d 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, - HummockVersionStats, StateTableInfoDelta, + CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, + StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use crate::hummock::model::CompactionGroup; use crate::manager::NotificationManager; use crate::model::{ InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction, @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> { pub(super) fn pre_commit_epoch( &mut self, tables_to_commit: &HashMap, - new_compaction_groups: HashMap>, - commit_sstables: BTreeMap>, + new_compaction_groups: Vec, + group_id_to_sub_levels: BTreeMap>>, new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - for (compaction_group_id, compaction_group_config) in new_compaction_groups { - { - let group_deltas = &mut new_version_delta - .group_deltas - .entry(compaction_group_id) - .or_default() - .group_deltas; - - #[expect(deprecated)] - group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some((*compaction_group_config).clone()), - group_id: compaction_group_id, - parent_group_id: StaticCompactionGroupId::NewCompactionGroup - as CompactionGroupId, - new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` - table_ids: vec![], - version: CompatibilityVersion::SplitGroupByTableId as i32, - split_key: None, - })); - } + for compaction_group in &new_compaction_groups { + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group.group_id()) + .or_default() + .group_deltas; + + #[expect(deprecated)] + group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { + group_config: Some(compaction_group.compaction_config().as_ref().clone()), + group_id: compaction_group.group_id(), + parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, + new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` + table_ids: vec![], + version: CompatibilityVersion::SplitGroupByTableId as i32, + split_key: None, + })); } // Append SSTs to a new version. - for (compaction_group_id, inserted_table_infos) in commit_sstables { + for (compaction_group_id, sub_levels) in group_id_to_sub_levels { let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); - group_deltas.push(group_delta); + for sub_level in sub_levels { + group_deltas.push(GroupDelta::NewL0SubLevel(sub_level)); + } } // update state table info diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7e847fc089aa2..0b216e84c4960 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2866,3 +2866,130 @@ async fn test_commit_multi_epoch() { assert_eq!(info.committed_epoch, epoch3); } } + +#[tokio::test] +async fn test_commit_with_large_size() { + let test_env = prepare_hummock_test_env().await; + let context_id = test_env.meta_client.context_id(); + let existing_table_id = TableId::new(1); + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, ssts: Vec, new_table_fragment_infos, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); + let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect(); + + let sstables = ssts + .into_iter() + .map(|sst| LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + created_at: u64::MAX, + }) + .collect_vec(); + + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context, + sstables, + new_table_fragment_infos, + change_log_delta: Default::default(), + tables_to_commit, + }) + .await + .unwrap(); + } + }; + + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 11, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch2 = SstableInfo { + sst_id: 12, + object_id: 2, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch3 = SstableInfo { + sst_id: 13, + object_id: 3, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + commit_epoch( + epoch1, + vec![ + sst1_epoch3.clone(), + sst1_epoch2.clone(), + sst1_epoch1.clone(), + ], + vec![NewTableFragmentInfo { + table_ids: HashSet::from_iter([existing_table_id]), + }], + &[existing_table_id], + ) + .await; + + let cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id()) + .await; + + let l0_sub_levels = test_env + .manager + .get_current_version() + .await + .levels + .get(&cg_id) + .unwrap() + .l0 + .clone(); + + println!("l0_sub_levels {:?}", l0_sub_levels.sub_levels); + assert_eq!(3, l0_sub_levels.sub_levels.len()); + assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len()); + assert_eq!( + sst1_epoch1.object_id, + l0_sub_levels.sub_levels[0].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len()); + assert_eq!( + sst1_epoch2.object_id, + l0_sub_levels.sub_levels[1].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len()); + assert_eq!( + sst1_epoch3.object_id, + l0_sub_levels.sub_levels[2].table_infos[0].object_id + ); +} diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index 8bd7ef64b6b83..f8e6ee1e24418 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -70,8 +70,8 @@ pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetr impl MonitoredStorageMetrics { pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self { - // 256B ~ max 4GB - let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap(); + // 256B ~ max 64GB + let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap(); // 10ms ~ max 2.7h let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap(); // ----- get ----- From 236e7617aebd55bd61b9ceb43cb2e1fd9b6d59e8 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 22 Nov 2024 23:06:25 +0800 Subject: [PATCH 4/7] fix: upgrade setuptools because of vulnerablility (#19540) --- e2e_test/iceberg/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/iceberg/pyproject.toml b/e2e_test/iceberg/pyproject.toml index eba4bcd12f137..e0daecfdcb579 100644 --- a/e2e_test/iceberg/pyproject.toml +++ b/e2e_test/iceberg/pyproject.toml @@ -8,7 +8,7 @@ authors = ["risingwavelabs"] python = "^3.10" pyspark = { version = "3.4.1", extras = ["sql", "connect"] } tomli = "2.0" -setuptools = "69" +setuptools = "70" [build-system] requires = ["poetry-core"] From b4bca5724e7acb9c39919aaa188ed699a10eb1f4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 23 Nov 2024 16:16:36 +0800 Subject: [PATCH 5/7] doc(dev-guide): note good first issue (#19543) Signed-off-by: xxchan --- CONTRIBUTING.md | 6 +++--- docs/dev/src/contributing.md | 11 ++++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7403b83afb698..5f305d85d07fb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,7 +1,7 @@ # Contributing to RisingWave -Contributing documentation has moved to the **[RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)**. - +Read the **[RisingWave Developer Guide](https://risingwavelabs.github.io/risingwave/)** for +how to develop and contribute to RisingWave. ## Before hacking on RisingWave @@ -13,4 +13,4 @@ Please read [the process] of how to submit your change to RisingWave. [Community Slack]: https://risingwave.com/slack [file an issue]: https://github.com/risingwavelabs/risingwave/issues/new/choose -[the process]: https://risingwavelabs.github.io/risingwave/contribution +[the process]: https://risingwavelabs.github.io/risingwave/contributing.html diff --git a/docs/dev/src/contributing.md b/docs/dev/src/contributing.md index 38d8b8cc77c33..926138795d617 100644 --- a/docs/dev/src/contributing.md +++ b/docs/dev/src/contributing.md @@ -11,12 +11,21 @@ To report bugs, create a [GitHub issue](https://github.com/risingwavelabs/rising +## Find Something to Work On + +Issues labeled with [ `good first issue` ](https://github.com/risingwavelabs/risingwave/contribute) are suitable for new RisingWave hackers. +They are relatively easy to begin with and can guide you getting familiar with one module of RisingWave. + ## Tests and miscellaneous checks Before submitting your code changes, ensure you fully test them and perform necessary checks. The testing instructions and necessary checks are detailed in other sections of the book. ## Submit a PR +### Ask for Review + +To get your PR reviewed and merged sooner, you can find and `@` mention developers who recently worked on the same files. If you're not sure who to ask, feel free to reach out to any active developers to help find relevant reviewers. Don't hesitate to follow up politely if you haven't received a response, or ask for help in the RisingWave Community Slack channel. We welcome you to be proactive in finding reviewers for your PR! + ### Pull Request title As described in [here](https://github.com/commitizen/conventional-commit-types/blob/master/index.json), a valid PR title should begin with one of the following prefixes: @@ -28,7 +37,7 @@ As described in [here](https://github.com/commitizen/conventional-commit-types/b - `style`: A refactoring that improves code style - `perf`: A code change that improves performance - `test`: Adding missing tests or correcting existing tests -- `build`: Changes that affect the build system or external dependencies (example scopes: `.config`, `.cargo`, `Cargo.toml`) +- `build`: Changes that affect the build system or external dependencies (example scopes: `.config`, `.cargo`, `Cargo.toml`) - `ci`: Changes to RisingWave CI configuration files and scripts (example scopes: `.github`, `ci` (Buildkite)) - `chore`: Other changes that don't modify src or test files - `revert`: Reverts a previous commit From 9abd62a7a63446c345310c1aeeba994c86cf20e8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 25 Nov 2024 11:14:40 +0800 Subject: [PATCH 6/7] refactor(optimizer): record error contexts when casting composite types (#19449) Signed-off-by: Bugen Zhao --- Cargo.lock | 8 +- Cargo.toml | 2 +- e2e_test/batch/basic/dml_update.slt.part | 11 +- .../tests/testdata/input/cast.yaml | 17 ++ .../tests/testdata/output/array.yaml | 2 +- .../tests/testdata/output/cast.yaml | 33 ++++ .../tests/testdata/output/expr.yaml | 2 +- .../tests/testdata/output/struct_query.yaml | 2 +- .../tests/testdata/output/update.yaml | 2 +- .../system_catalog/pg_catalog/pg_cast.rs | 10 +- src/frontend/src/error.rs | 8 + src/frontend/src/expr/function_call.rs | 60 +++---- src/frontend/src/expr/mod.rs | 8 +- src/frontend/src/expr/type_inference/cast.rs | 159 +++++++++++++----- src/frontend/src/expr/type_inference/mod.rs | 3 +- 15 files changed, 229 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8be3e154ddd8..c08e776947cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14299,9 +14299,9 @@ dependencies = [ [[package]] name = "thiserror-ext" -version = "0.1.2" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c19760dc47062bca5c1b3699b032111c93802d51ac47660db11b08afc6bad2" +checksum = "ef4323942237f7cc071061f2c5f0db919e6053c2cdf58c6bc974883073429737" dependencies = [ "thiserror 1.0.63", "thiserror-ext-derive", @@ -14309,9 +14309,9 @@ dependencies = [ [[package]] name = "thiserror-ext-derive" -version = "0.1.2" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "667c8c48f68021098038115926c64d9950b0582062ae63f7d30638b1168daf03" +checksum = "96541747c50e6c73e094737938f4f5dfaf50c48a31adff4197a3e2a481371674" dependencies = [ "either", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index c260bf8c52932..6f9b85aa5c1ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,7 +163,7 @@ parquet = { version = "53.2", features = ["async"] } mysql_async = { version = "0.34", default-features = false, features = [ "default", ] } -thiserror-ext = "0.1.2" +thiserror-ext = { version = "0.2.1", features = ["backtrace"] } tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/e2e_test/batch/basic/dml_update.slt.part b/e2e_test/batch/basic/dml_update.slt.part index fcc3bbdfce9a2..fc2647cea147b 100644 --- a/e2e_test/batch/basic/dml_update.slt.part +++ b/e2e_test/batch/basic/dml_update.slt.part @@ -93,10 +93,15 @@ select * from t; 889 999 # Multiple assignments, to subquery with cast failure. -# TODO: this currently shows `cannot cast type "record" to "record"` because we wrap the subquery result -# into a struct, which is not quite clear. -statement error cannot cast type +statement error update t set (v1, v2) = (select '888.88', 999); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: cannot cast type "record" to "record" + 2: cannot cast type "character varying" to "integer" in Assign context + # Multiple assignments, to subquery with mismatched column count. statement error number of columns does not match number of values diff --git a/src/frontend/planner_test/tests/testdata/input/cast.yaml b/src/frontend/planner_test/tests/testdata/input/cast.yaml index a21b9e3cb409a..f2344b3dd00a3 100644 --- a/src/frontend/planner_test/tests/testdata/input/cast.yaml +++ b/src/frontend/planner_test/tests/testdata/input/cast.yaml @@ -64,3 +64,20 @@ select count(*) FILTER(WHERE 'y') from t; expected_outputs: - batch_plan +- name: composite type cast error message (array) + sql: | + select '[]'::int[]::bytea[]; + expected_outputs: + - binder_error +- name: composite type cast error message (struct) + sql: | + create table t (v struct, c bool>); + select v::struct, f bool> from t; + expected_outputs: + - binder_error +- name: composite type cast error message (map) + sql: | + create table t (v map(int, int)); + select v::map(int, bytea) from t; + expected_outputs: + - binder_error diff --git a/src/frontend/planner_test/tests/testdata/output/array.yaml b/src/frontend/planner_test/tests/testdata/output/array.yaml index a2b9486fdb33d..259a727d23df6 100644 --- a/src/frontend/planner_test/tests/testdata/output/array.yaml +++ b/src/frontend/planner_test/tests/testdata/output/array.yaml @@ -228,7 +228,7 @@ sql: | create table t (v1 varchar[]); insert into t values ('{c,' || 'd}'); - binder_error: 'Bind error: cannot cast type "character varying" to "character varying[]" in Assign context' + binder_error: cannot cast type "character varying" to "character varying[]" in Assign context - name: string to varchar[] in explicit context sql: | select ('{c,' || 'd}')::varchar[]; diff --git a/src/frontend/planner_test/tests/testdata/output/cast.yaml b/src/frontend/planner_test/tests/testdata/output/cast.yaml index 636f25a9b07de..28cc002eae562 100644 --- a/src/frontend/planner_test/tests/testdata/output/cast.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cast.yaml @@ -80,3 +80,36 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [count] } └─BatchScan { table: t, columns: [], distribution: SomeShard } +- name: composite type cast error message (array) + sql: | + select '[]'::int[]::bytea[]; + binder_error: | + Failed to bind expression: CAST(CAST('[]' AS INT[]) AS BYTEA[]) + + Caused by these errors (recent errors listed first): + 1: cannot cast type "integer[]" to "bytea[]" + 2: cannot cast type "integer" to "bytea" in Explicit context +- name: composite type cast error message (struct) + sql: | + create table t (v struct, c bool>); + select v::struct, f bool> from t; + binder_error: | + Failed to bind expression: CAST(v AS STRUCT, f BOOLEAN>) + + Caused by these errors (recent errors listed first): + 1: cannot cast type "struct, c boolean>" to "struct, f boolean>" + 2: cannot cast struct field "a" to struct field "d" + 3: cannot cast type "struct" to "struct" + 4: cannot cast struct field "b" to struct field "e" + 5: cannot cast type "integer" to "bytea" in Explicit context +- name: composite type cast error message (map) + sql: | + create table t (v map(int, int)); + select v::map(int, bytea) from t; + binder_error: | + Failed to bind expression: CAST(v AS MAP(INT,BYTEA)) + + Caused by these errors (recent errors listed first): + 1: cannot cast type "map(integer,integer)" to "map(integer,bytea)" + 2: cannot cast map value + 3: cannot cast type "integer" to "bytea" in Explicit context diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index eacab421069a9..ce2f86ed2e689 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -304,7 +304,7 @@ Failed to bind expression: concat_ws(v1, 1.2) Caused by: - Bind error: cannot cast type "integer" to "character varying" in Implicit context + cannot cast type "integer" to "character varying" in Implicit context - sql: | create table t (v1 int); select concat_ws() from t; diff --git a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml index 907aa209c6d2c..3f6b4579e44c9 100644 --- a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml @@ -421,7 +421,7 @@ - sql: | CREATE TABLE a (c STRUCT, j INTEGER>); INSERT INTO a VALUES (1); - binder_error: 'Bind error: cannot cast type "integer" to "struct, j integer>" in Assign context' + binder_error: cannot cast type "integer" to "struct, j integer>" in Assign context - name: test struct type alignment in CASE expression sql: | select CASE WHEN false THEN ROW(0, INTERVAL '1') WHEN true THEN ROW(1.1, INTERVAL '1') ELSE ROW(1, INTERVAL '1') END; diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index 4a12b492660ad..26c6d52dc5e05 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -11,7 +11,7 @@ - sql: | create table t (v1 int, v2 int); update t set v1 = true; - binder_error: 'Bind error: cannot cast type "boolean" to "integer" in Assign context' + binder_error: cannot cast type "boolean" to "integer" in Assign context - sql: | create table t (v1 int, v2 int); update t set v1 = v2 + 1; diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_cast.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_cast.rs index d5b1332c25b3e..291743ea4ba26 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_cast.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_cast.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use risingwave_common::types::{DataType, Fields}; use risingwave_frontend_macro::system_catalog; use crate::catalog::system_catalog::SysCatalogReaderImpl; -use crate::expr::cast_map_array; +use crate::expr::CAST_TABLE; /// The catalog `pg_cast` stores data type conversion paths. /// Ref: [`https://www.postgresql.org/docs/current/catalog-pg-cast.html`] @@ -31,12 +32,11 @@ struct PgCast { #[system_catalog(table, "pg_catalog.pg_cast")] fn read_pg_cast(_: &SysCatalogReaderImpl) -> Vec { - let mut cast_array = cast_map_array(); - cast_array.sort(); - cast_array + CAST_TABLE .iter() + .sorted() .enumerate() - .map(|(idx, (src, target, ctx))| PgCast { + .map(|(idx, ((src, target), ctx))| PgCast { oid: idx as i32, castsource: DataType::try_from(*src).unwrap().to_oid(), casttarget: DataType::try_from(*target).unwrap().to_oid(), diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index f0cf35e859664..d0615b9f0e13d 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -26,6 +26,8 @@ use risingwave_rpc_client::error::{RpcError, TonicStatusWrapper}; use thiserror::Error; use tokio::task::JoinError; +use crate::expr::CastError; + /// The error type for the frontend crate, acting as the top-level error type for the /// entire RisingWave project. // TODO(error-handling): this is migrated from the `common` crate, and there could @@ -114,6 +116,12 @@ pub enum ErrorCode { #[backtrace] error: BoxedError, }, + #[error(transparent)] + CastError( + #[from] + #[backtrace] + CastError, + ), #[error("Catalog error: {0}")] CatalogError( #[source] diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index af1f84b321eb5..c5ae4ca178bf8 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -16,12 +16,11 @@ use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; -use thiserror::Error; -use thiserror_ext::AsReport; -use super::{cast_ok, infer_some_all, infer_type, CastContext, Expr, ExprImpl, Literal}; -use crate::error::{ErrorCode, Result as RwResult}; -use crate::expr::{ExprDisplay, ExprType, ExprVisitor, ImpureAnalyzer}; +use super::type_inference::cast; +use super::{infer_some_all, infer_type, CastContext, CastError, Expr, ExprImpl, Literal}; +use crate::error::Result as RwResult; +use crate::expr::{bail_cast_error, ExprDisplay, ExprType, ExprVisitor, ImpureAnalyzer}; #[derive(Clone, Eq, PartialEq, Hash)] pub struct FunctionCall { @@ -144,22 +143,23 @@ impl FunctionCall { // else when eager parsing fails, just proceed as normal. // Some callers are not ready to handle `'a'::int` error here. } + let source = child.return_type(); if source == target { - Ok(()) - // Casting from unknown is allowed in all context. And PostgreSQL actually does the parsing - // in frontend. - } else if child.is_untyped() || cast_ok(&source, &target, allows) { - // Always Ok below. Safe to mutate `child`. - let owned = std::mem::replace(child, ExprImpl::literal_bool(false)); - *child = Self::new_unchecked(ExprType::Cast, vec![owned], target).into(); - Ok(()) + return Ok(()); + } + + if child.is_untyped() { + // Casting from unknown is allowed in all context. And PostgreSQL actually does the parsing + // in frontend. } else { - Err(CastError(format!( - "cannot cast type \"{}\" to \"{}\" in {:?} context", - source, target, allows - ))) + cast(&source, &target, allows)?; } + + // Always Ok below. Safe to mutate `child`. + let owned = std::mem::replace(child, ExprImpl::literal_bool(false)); + *child = Self::new_unchecked(ExprType::Cast, vec![owned], target).into(); + Ok(()) } /// Cast a `ROW` expression to the target type. We intentionally disallow casting arbitrary @@ -170,13 +170,13 @@ impl FunctionCall { target_type: DataType, allows: CastContext, ) -> Result<(), CastError> { + // Can only cast to a struct type. let DataType::Struct(t) = &target_type else { - return Err(CastError(format!( - "cannot cast type \"{}\" to \"{}\" in {:?} context", - func.return_type(), + bail_cast_error!( + "cannot cast type \"{}\" to \"{}\"", + func.return_type(), // typically "record" target_type, - allows - ))); + ); }; match t.len().cmp(&func.inputs.len()) { std::cmp::Ordering::Equal => { @@ -189,10 +189,8 @@ impl FunctionCall { func.return_type = target_type; Ok(()) } - std::cmp::Ordering::Less => Err(CastError("Input has too few columns.".to_string())), - std::cmp::Ordering::Greater => { - Err(CastError("Input has too many columns.".to_string())) - } + std::cmp::Ordering::Less => bail_cast_error!("input has too few columns"), + std::cmp::Ordering::Greater => bail_cast_error!("input has too many columns"), } } @@ -422,13 +420,3 @@ pub fn is_row_function(expr: &ExprImpl) -> bool { } false } - -#[derive(Debug, Error)] -#[error("{0}")] -pub struct CastError(pub(super) String); - -impl From for ErrorCode { - fn from(value: CastError) -> Self { - ErrorCode::BindError(value.to_report_string()) - } -} diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index c7acdfa5c4a3c..4cc3a18319398 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -66,10 +66,7 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType; pub use session_timezone::{SessionTimezone, TimestamptzExprFinder}; pub use subquery::{Subquery, SubqueryKind}; pub use table_function::{TableFunction, TableFunctionType}; -pub use type_inference::{ - align_types, cast_map_array, cast_ok, cast_sigs, infer_some_all, infer_type, infer_type_name, - infer_type_with_sigmap, CastContext, CastSig, FuncSign, -}; +pub use type_inference::*; pub use user_defined_function::UserDefinedFunction; pub use utils::*; pub use window_function::WindowFunction; @@ -300,7 +297,7 @@ impl ExprImpl { ))), DataType::Int32 => Ok(self), dt if dt.is_int() => Ok(self.cast_explicit(DataType::Int32)?), - _ => Err(CastError("Unsupported input type".to_string())), + _ => bail_cast_error!("unsupported input type"), } } @@ -1171,7 +1168,6 @@ use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::row::OwnedRow; -use self::function_call::CastError; use crate::binder::BoundSetExpr; use crate::utils::Condition; diff --git a/src/frontend/src/expr/type_inference/cast.rs b/src/frontend/src/expr/type_inference/cast.rs index 51441c3f70c5b..c9b09fe18ecac 100644 --- a/src/frontend/src/expr/type_inference/cast.rs +++ b/src/frontend/src/expr/type_inference/cast.rs @@ -13,12 +13,15 @@ // limitations under the License. use std::collections::BTreeMap; +use std::error::Error; use std::sync::LazyLock; use itertools::Itertools as _; use parse_display::Display; use risingwave_common::types::{DataType, DataTypeName}; -use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; +use thiserror::Error; +use thiserror_ext::{Box, Macro}; use crate::error::ErrorCode; use crate::expr::{Expr as _, ExprImpl, InputRef, Literal}; @@ -114,76 +117,156 @@ pub fn align_array_and_element( Ok(array_type) } +/// A stack of error messages for the cast operation. +#[derive(Error, Debug, Box, Macro)] +#[thiserror_ext(newtype(name = CastError), macro(path = "crate::expr"))] +#[error("{message}")] +pub struct CastErrorInner { + pub source: Option, + pub message: Box, +} + +pub type CastResult = Result; + +/// Returns `Ok` if `ok` is true, otherwise returns a placeholder [`CastError`] to be further +/// wrapped with a more informative context in [`cast`]. +fn canbo(ok: bool) -> CastResult { + if ok { + Ok(()) + } else { + bail_cast_error!() + } +} +/// Equivalent to `canbo(false)`. +fn cannot() -> CastResult { + canbo(false) +} + +/// Checks whether casting from `source` to `target` is ok in `allows` context. +/// Returns an error if the cast is not possible. +pub fn cast(source: &DataType, target: &DataType, allows: CastContext) -> Result<(), CastError> { + macro_rules! any { + ($f:ident) => { + source.$f() || target.$f() + }; + } + + if any!(is_struct) { + cast_struct(source, target, allows) + } else if any!(is_array) { + cast_array(source, target, allows) + } else if any!(is_map) { + cast_map(source, target, allows) + } else { + canbo(cast_ok_base(source, target, allows)) + } + .map_err(|inner| { + // Only show "in .. context" once in the error source chain. + let in_context = if inner.source().is_none() { + &format!(" in {:?} context", allows) + } else { + "" + }; + cast_error!( + source = inner, + "cannot cast type \"{}\" to \"{}\"{}", + source, + target, + in_context, + ) + }) +} + /// Checks whether casting from `source` to `target` is ok in `allows` context. +/// +/// Equivalent to `cast(..).is_ok()`, but [`cast`] may be preferred for its error messages. pub fn cast_ok(source: &DataType, target: &DataType, allows: CastContext) -> bool { - cast_ok_struct(source, target, allows) - || cast_ok_array(source, target, allows) - || cast_ok_map(source, target, allows) - || cast_ok_base(source, target, allows) + cast(source, target, allows).is_ok() } /// Checks whether casting from `source` to `target` is ok in `allows` context. /// Both `source` and `target` must be base types, i.e. not struct or array. pub fn cast_ok_base(source: &DataType, target: &DataType, allows: CastContext) -> bool { - matches!(CAST_MAP.get(&(source.into(), target.into())), Some(context) if *context <= allows) + matches!(CAST_TABLE.get(&(source.into(), target.into())), Some(context) if *context <= allows) } -fn cast_ok_struct(source: &DataType, target: &DataType, allows: CastContext) -> bool { +fn cast_struct(source: &DataType, target: &DataType, allows: CastContext) -> CastResult { match (source, target) { (DataType::Struct(lty), DataType::Struct(rty)) => { if lty.is_empty() || rty.is_empty() { unreachable!("record type should be already processed at this point"); } if lty.len() != rty.len() { - // only cast structs of the same length - return false; + bail_cast_error!("cannot cast structs of different lengths"); } // ... and all fields are castable - lty.types() - .zip_eq_fast(rty.types()) - .all(|(src, dst)| src == dst || cast_ok(src, dst, allows)) + lty.iter().zip_eq_debug(rty.iter()).try_for_each( + |((src_name, src_ty), (dst_name, dst_ty))| { + if src_ty == dst_ty { + Ok(()) + } else { + cast(src_ty, dst_ty, allows).map_err(|inner| { + if src_name.is_empty() { + inner + } else if dst_name.is_empty() { + cast_error!( + source = inner, + "cannot cast struct field \"{}\"", + src_name + ) + } else { + cast_error!( + source = inner, + "cannot cast struct field \"{}\" to struct field \"{}\"", + src_name, + dst_name + ) + } + }) + } + }, + ) } // The automatic casts to string types are treated as assignment casts, while the automatic // casts from string types are explicit-only. // https://www.postgresql.org/docs/14/sql-createcast.html#id-1.9.3.58.7.4 - (DataType::Varchar, DataType::Struct(_)) => CastContext::Explicit <= allows, - (DataType::Struct(_), DataType::Varchar) => CastContext::Assign <= allows, - _ => false, + (DataType::Varchar, DataType::Struct(_)) => canbo(CastContext::Explicit <= allows), + (DataType::Struct(_), DataType::Varchar) => canbo(CastContext::Assign <= allows), + _ => cannot(), } } -fn cast_ok_array(source: &DataType, target: &DataType, allows: CastContext) -> bool { +fn cast_array(source: &DataType, target: &DataType, allows: CastContext) -> CastResult { match (source, target) { (DataType::List(source_elem), DataType::List(target_elem)) => { - cast_ok(source_elem, target_elem, allows) + cast(source_elem, target_elem, allows) } // The automatic casts to string types are treated as assignment casts, while the automatic // casts from string types are explicit-only. // https://www.postgresql.org/docs/14/sql-createcast.html#id-1.9.3.58.7.4 - (DataType::Varchar, DataType::List(_)) => CastContext::Explicit <= allows, - (DataType::List(_), DataType::Varchar) => CastContext::Assign <= allows, - _ => false, + (DataType::Varchar, DataType::List(_)) => canbo(CastContext::Explicit <= allows), + (DataType::List(_), DataType::Varchar) => canbo(CastContext::Assign <= allows), + _ => cannot(), } } -fn cast_ok_map(source: &DataType, target: &DataType, allows: CastContext) -> bool { +fn cast_map(source: &DataType, target: &DataType, allows: CastContext) -> CastResult { match (source, target) { - (DataType::Map(source_elem), DataType::Map(target_elem)) => cast_ok( - &source_elem.clone().into_list(), - &target_elem.clone().into_list(), - allows, - ), - _ => false, + (DataType::Map(source_elem), DataType::Map(target_elem)) => { + if source_elem.key() != target_elem.key() { + cast(source_elem.key(), target_elem.key(), allows) + .map_err(|inner| cast_error!(source = inner, "cannot cast map key"))?; + } + if source_elem.value() != target_elem.value() { + cast(source_elem.value(), target_elem.value(), allows) + .map_err(|inner| cast_error!(source = inner, "cannot cast map value"))?; + } + Ok(()) + } + _ => cannot(), } } -pub fn cast_map_array() -> Vec<(DataTypeName, DataTypeName, CastContext)> { - CAST_MAP - .iter() - .map(|((src, target), ctx)| (*src, *target, *ctx)) - .collect_vec() -} - #[derive(Clone, Debug)] pub struct CastSig { pub from_type: DataTypeName, @@ -204,10 +287,10 @@ pub enum CastContext { Explicit, } -pub type CastMap = BTreeMap<(DataTypeName, DataTypeName), CastContext>; +pub type CastTable = BTreeMap<(DataTypeName, DataTypeName), CastContext>; pub fn cast_sigs() -> impl Iterator { - CAST_MAP + CAST_TABLE .iter() .map(|((from_type, to_type), context)| CastSig { from_type: *from_type, @@ -216,7 +299,7 @@ pub fn cast_sigs() -> impl Iterator { }) } -pub static CAST_MAP: LazyLock = LazyLock::new(|| { +pub static CAST_TABLE: LazyLock = LazyLock::new(|| { // cast rules: // 1. implicit cast operations in PG are organized in 3 sequences, // with the reverse direction being assign cast operations. @@ -306,7 +389,7 @@ mod tests { fn test_cast_ok() { // With the help of a script we can obtain the 3 expected cast tables from PG. They are // slightly modified on same-type cast and from-string cast for reasons explained above in - // `build_cast_map`. + // `build_cast_table`. let actual = gen_cast_table(CastContext::Implicit); assert_eq!( diff --git a/src/frontend/src/expr/type_inference/mod.rs b/src/frontend/src/expr/type_inference/mod.rs index 2845f05ec0dae..4c507a9586190 100644 --- a/src/frontend/src/expr/type_inference/mod.rs +++ b/src/frontend/src/expr/type_inference/mod.rs @@ -18,6 +18,7 @@ mod cast; mod func; pub use cast::{ - align_types, cast_map_array, cast_ok, cast_ok_base, cast_sigs, CastContext, CastSig, + align_types, bail_cast_error, cast, cast_error, cast_ok, cast_ok_base, cast_sigs, CastContext, + CastError, CastErrorInner, CastSig, CAST_TABLE, }; pub use func::{infer_some_all, infer_type, infer_type_name, infer_type_with_sigmap, FuncSign}; From 420b60b591f4da6a6de1eea4966833667f010cd3 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 25 Nov 2024 14:48:00 +0800 Subject: [PATCH 7/7] test(pgwire): complete the test cases to cover all data types (#19520) --- e2e_test/python_client/main.py | 133 ++++++++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 12 deletions(-) diff --git a/e2e_test/python_client/main.py b/e2e_test/python_client/main.py index bb41ba6c38f34..dcf7c52518b8e 100644 --- a/e2e_test/python_client/main.py +++ b/e2e_test/python_client/main.py @@ -1,19 +1,128 @@ import psycopg +from decimal import Decimal +import math +import unittest +import datetime +import zoneinfo -def test_psycopg_extended_mode(): - conn = psycopg.connect(host='localhost', port='4566', dbname='dev', user='root') - with conn.cursor() as cur: - cur.execute("select Array[1::bigint, 2::bigint, 3::bigint]", binary=True) - assert cur.fetchone() == ([1, 2, 3],) +PG_HOST = 'localhost' +PG_PORT = 4566 +PG_DBNAME = 'dev' +PG_USER = 'root' - cur.execute("select Array['foo', null, 'bar']", binary=True) - assert cur.fetchone() == (['foo', None, 'bar'],) +class TestPsycopgExtendedMode(unittest.TestCase): + def test_psycopg_extended_mode(self): + with psycopg.connect(host=PG_HOST, port=PG_PORT, dbname=PG_DBNAME, user=PG_USER) as conn: + with conn.cursor() as cur: + # Boolean + cur.execute("select true, false, null::boolean", binary=True) + self.assertEqual(cur.fetchone(), (True, False, None)) - cur.execute("select ROW('123 Main St', 'New York', '10001')", binary=True) - assert cur.fetchone() == (('123 Main St', 'New York', '10001'),) + # Integer types + cur.execute("select 1::smallint, 2::integer, 3::bigint", binary=True) + self.assertEqual(cur.fetchone(), (1, 2, 3)) - cur.execute("select array[ROW('123 Main St', 'New York', '10001'), ROW('234 Main St', null, '10001')]", binary=True) - assert cur.fetchone() == ([('123 Main St', 'New York', '10001'), ('234 Main St', None, '10001')],) + # Decimal/Numeric types + cur.execute("select 1.23::decimal, 2.5::real, 3.45::double precision", binary=True) + self.assertEqual(cur.fetchone(), (Decimal('1.23'), 2.5, 3.45)) + + # String + cur.execute("select 'hello'::varchar, null::varchar", binary=True) + self.assertEqual(cur.fetchone(), ('hello', None)) + + # Date/Time types + cur.execute("select '2023-01-01'::date, '12:34:56'::time, '2023-01-01 12:34:56'::timestamp, '2023-01-01 12:34:56+00'::timestamptz", binary=True) + self.assertEqual(cur.fetchone(), (datetime.date(2023, 1, 1), datetime.time(12, 34, 56), datetime.datetime(2023, 1, 1, 12, 34, 56), datetime.datetime(2023, 1, 1, 20, 34, 56, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')))) + + # Interval + cur.execute("select '1 year 2 months 3 days 4 hours 5 minutes 6 seconds'::interval", binary=True) + self.assertEqual(cur.fetchone(), (datetime.timedelta(days=428, seconds=14706),)) + + # Byte array + cur.execute("select '\\xDEADBEEF'::bytea", binary=True) + self.assertEqual(cur.fetchone(), (b'\xDE\xAD\xBE\xEF',)) + + cur.execute("select '\\x'::bytea", binary=True) + self.assertEqual(cur.fetchone(), (b'',)) + + # Array + cur.execute("select ARRAY[true, false, null]::boolean[]", binary=True) + self.assertEqual(cur.fetchone(), ([True, False, None],)) + + cur.execute("select ARRAY[1, 2, 3]::smallint[]", binary=True) + self.assertEqual(cur.fetchone(), ([1, 2, 3],)) + + cur.execute("select ARRAY[1, 2, 3]::integer[]", binary=True) + self.assertEqual(cur.fetchone(), ([1, 2, 3],)) + + cur.execute("select ARRAY[1, 2, 3]::bigint[]", binary=True) + self.assertEqual(cur.fetchone(), ([1, 2, 3],)) + + cur.execute("select ARRAY[1.1, 2.2, 3.3]::decimal[]", binary=True) + self.assertEqual(cur.fetchone(), ([Decimal('1.1'), Decimal('2.2'), Decimal('3.3')],)) + + cur.execute("select ARRAY[1.1, 2.2, 3.3]::real[]", binary=True) + result = cur.fetchone()[0] # Fetch once and store the result + self.assertAlmostEqual(result[0], 1.1, places=6) + self.assertAlmostEqual(result[1], 2.2, places=6) + self.assertAlmostEqual(result[2], 3.3, places=6) + + cur.execute("select ARRAY[1.1, 2.2, 3.3]::double precision[]", binary=True) + result = cur.fetchone()[0] # Fetch once and store the result + self.assertAlmostEqual(result[0], 1.1, places=6) + self.assertAlmostEqual(result[1], 2.2, places=6) + self.assertAlmostEqual(result[2], 3.3, places=6) + + cur.execute("select ARRAY['foo', 'bar', null]::varchar[]", binary=True) + self.assertEqual(cur.fetchone(), (['foo', 'bar', None],)) + + cur.execute("select ARRAY['\\xDEADBEEF'::bytea, '\\x0102'::bytea]", binary=True) + self.assertEqual(cur.fetchone(), ([b'\xDE\xAD\xBE\xEF', b'\x01\x02'],)) + + cur.execute("select ARRAY['2023-01-01', '2023-01-02']::date[]", binary=True) + self.assertEqual(cur.fetchone(), ([datetime.date(2023, 1, 1), datetime.date(2023, 1, 2)],)) + + cur.execute("select ARRAY['12:34:56', '23:45:01']::time[]", binary=True) + self.assertEqual(cur.fetchone()[0], [datetime.time(12, 34, 56), datetime.time(23, 45, 1)]) + + cur.execute("select ARRAY['2023-01-01 12:34:56', '2023-01-02 23:45:01']::timestamp[]", binary=True) + self.assertEqual(cur.fetchone()[0], [datetime.datetime(2023, 1, 1, 12, 34, 56), datetime.datetime(2023, 1, 2, 23, 45, 1)]) + + cur.execute("select ARRAY['2023-01-01 12:34:56+00', '2023-01-02 23:45:01+00']::timestamptz[]", binary=True) + self.assertEqual(cur.fetchone()[0], [datetime.datetime(2023, 1, 1, 12, 34, 56, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 1, 2, 23, 45, 1, tzinfo=datetime.timezone.utc)]) + + cur.execute("select ARRAY['{\"a\": 1}'::jsonb, '{\"b\": 2}'::jsonb]", binary=True) + self.assertEqual(cur.fetchone(), ([{'a': 1}, {'b': 2}],)) + + # Struct + cur.execute("select ROW('123 Main St'::varchar, 'New York'::varchar, 10001)", binary=True) + self.assertEqual(cur.fetchone(), (('123 Main St', 'New York', 10001),)) + + cur.execute("select array[ROW('123 Main St'::varchar, 'New York'::varchar, 10001), ROW('234 Main St'::varchar, null, 10002)]", binary=True) + self.assertEqual(cur.fetchone(), ([('123 Main St', 'New York', 10001), ('234 Main St', None, 10002)],)) + + # Numeric + cur.execute("select 'NaN'::numeric, 'NaN'::real, 'NaN'::double precision", binary=True) + result = cur.fetchone() + self.assertTrue(result[0].is_nan()) + self.assertTrue(math.isnan(result[1])) + self.assertTrue(math.isnan(result[2])) + + cur.execute("select 'Infinity'::numeric, 'Infinity'::real, 'Infinity'::double precision", binary=True) + self.assertEqual(cur.fetchone(), (float('inf'), float('inf'), float('inf'))) + + cur.execute("select '-Infinity'::numeric, '-Infinity'::real, '-Infinity'::double precision", binary=True) + self.assertEqual(cur.fetchone(), (float('-inf'), float('-inf'), float('-inf'))) + + # JSONB + cur.execute("select '{\"name\": \"John\", \"age\": 30, \"city\": null}'::jsonb", binary=True) + self.assertEqual(cur.fetchone(), ({'name': 'John', 'age': 30, 'city': None},)) + + cur.execute("select '{\"scores\": [85.5, 90, null], \"passed\": true}'::jsonb", binary=True) + self.assertEqual(cur.fetchone(), ({'scores': [85.5, 90, None], 'passed': True},)) + + cur.execute("select '[{\"id\": 1, \"value\": null}, {\"id\": 2, \"value\": \"test\"}]'::jsonb", binary=True) + self.assertEqual(cur.fetchone(), ([{'id': 1, 'value': None}, {'id': 2, 'value': 'test'}],)) if __name__ == '__main__': - test_psycopg_extended_mode() + unittest.main()