From 214118baea97103207dfcd6e134a1e8fe63c31ad Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 15 Sep 2023 10:03:14 +0800 Subject: [PATCH 01/11] chore(deps): Bump serde_json from 1.0.106 to 1.0.107 (#12322) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- src/tests/simulation/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a882bd21a360..bd4277aa05617 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8118,9 +8118,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 0e7b4e0d8be8f..82992b8b04c2a 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -43,7 +43,7 @@ risingwave_sqlparser = { workspace = true } risingwave_sqlsmith = { workspace = true } serde = "1.0.188" serde_derive = "1.0.188" -serde_json = "1.0.106" +serde_json = "1.0.107" sqllogictest = "0.15.3" tempfile = "3" tikv-jemallocator = { workspace = true } From 43c010ee16c1cc6a05c1b085b0abd1c77f1685ce Mon Sep 17 00:00:00 2001 From: Croxx Date: Fri, 15 Sep 2023 11:59:41 +0800 Subject: [PATCH 02/11] chore: fix comment and metrics (#12331) Signed-off-by: MrCroxx --- src/storage/src/hummock/event_handler/refiller.rs | 10 +++++++++- src/storage/src/hummock/sstable/mod.rs | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index a25461a9f8a1e..f0685724a4da0 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -48,6 +48,7 @@ pub struct CacheRefillMetrics { pub data_refill_filtered_total: GenericCounter, pub data_refill_attempts_total: GenericCounter, + pub data_refill_started_total: GenericCounter, pub meta_refill_attempts_total: GenericCounter, pub refill_queue_total: IntGauge, @@ -83,6 +84,9 @@ impl CacheRefillMetrics { let data_refill_attempts_total = refill_total .get_metric_with_label_values(&["data", "attempts"]) .unwrap(); + let data_refill_started_total = refill_total + .get_metric_with_label_values(&["data", "started"]) + .unwrap(); let meta_refill_attempts_total = refill_total .get_metric_with_label_values(&["meta", "attempts"]) .unwrap(); @@ -102,6 +106,7 @@ impl CacheRefillMetrics { meta_refill_success_duration, data_refill_filtered_total, data_refill_attempts_total, + data_refill_started_total, meta_refill_attempts_total, refill_queue_total, } @@ -290,9 +295,12 @@ impl CacheRefillTask { let mut tasks = vec![]; for sst_info in &holders { let task = async move { + GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc(); + let permit = context.concurrency.acquire().await.unwrap(); - GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc(); + GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc(); + match context .sstable_store .fill_data_file_cache(sst_info.value()) diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index e79779bd46d78..70b4fc6b64f36 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -326,7 +326,7 @@ impl BlockMeta { /// Format: /// /// ```plain - /// | offset (4B) | len (4B) | smallest key len (4B) | smallest key | + /// | offset (4B) | len (4B) | uncompressed size (4B) | smallest key len (4B) | smallest key | /// ``` pub fn encode(&self, buf: &mut Vec) { buf.put_u32_le(self.offset); From a99e6f3eb346176f34d27f309c2108b767b4ed8a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 15 Sep 2023 13:58:19 +0800 Subject: [PATCH 03/11] fix(stream): fix pk indices of GroupTopN executors (#12304) Signed-off-by: Richard Chien --- .../streaming/group_top_n/group_top_1.slt | 45 +++++++++++++++++++ .../main1.slt} | 0 src/stream/src/executor/top_n/group_top_n.rs | 14 ++++-- .../executor/top_n/group_top_n_appendonly.rs | 11 +++-- src/stream/src/from_proto/group_top_n.rs | 5 ++- 5 files changed, 68 insertions(+), 7 deletions(-) create mode 100644 e2e_test/streaming/group_top_n/group_top_1.slt rename e2e_test/streaming/{group_top_n.slt => group_top_n/main1.slt} (100%) diff --git a/e2e_test/streaming/group_top_n/group_top_1.slt b/e2e_test/streaming/group_top_n/group_top_1.slt new file mode 100644 index 0000000000000..2bed792a4ae82 --- /dev/null +++ b/e2e_test/streaming/group_top_n/group_top_1.slt @@ -0,0 +1,45 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# https://github.com/risingwavelabs/risingwave/issues/12282 + +statement ok +create table t(a int, b int, c int); + +statement ok +create materialized view mv as SELECT * FROM ( + SELECT + *, + row_number() OVER (PARTITION BY a ORDER BY b) AS rank + FROM t +) WHERE rank <= 1; + +statement ok +insert into t values (1, 1, 1); + +query iiiI +select * from mv; +---- +1 1 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 + +statement ok +insert into t values (1, 0, 1); + +query iiiI +select * from mv; +---- +1 0 1 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/e2e_test/streaming/group_top_n.slt b/e2e_test/streaming/group_top_n/main1.slt similarity index 100% rename from e2e_test/streaming/group_top_n.slt rename to e2e_test/streaming/group_top_n/main1.slt diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 421b1141843a0..7e075002b99cb 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -51,6 +51,7 @@ impl GroupTopNExecutor, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -66,6 +67,7 @@ impl GroupTopNExecutor InnerGroupTopNExecutor, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -121,12 +125,13 @@ impl InnerGroupTopNExecutor::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("GroupTopNExecutor {:X}", executor_id), }, @@ -408,6 +413,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(); let top_n_executor = Box::new(a); @@ -505,6 +511,7 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); @@ -595,6 +602,7 @@ mod tests { vec![1, 2], state_table, Arc::new(AtomicU64::new(0)), + pk_indices(), ) .unwrap(), ); diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index ceb4a3bca4d40..8df8ee17768c5 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -69,6 +69,7 @@ impl group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + pk_indices: PkIndices, ) -> StreamResult { let info = input.info(); Ok(TopNExecutorWrapper { @@ -84,6 +85,7 @@ impl state_table, watermark_epoch, ctx, + pk_indices, )?, }) } @@ -129,9 +131,11 @@ impl state_table: StateTable, watermark_epoch: AtomicU64Ref, ctx: ActorContextRef, + pk_indices: PkIndices, ) -> StreamResult { let ExecutorInfo { - pk_indices, schema, .. + schema: input_schema, + .. } = input_info; let metrics_info = MetricsInfo::new( @@ -141,12 +145,13 @@ impl "GroupTopN", ); - let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by); + let cache_key_serde = + create_cache_key_serde(&storage_key, &input_schema, &order_by, &group_by); let managed_state = ManagedTopNState::::new(state_table, cache_key_serde.clone()); Ok(Self { info: ExecutorInfo { - schema, + schema: input_schema, pk_indices, identity: format!("AppendOnlyGroupTopNExecutor {:X}", executor_id), }, diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 9f74134308daa..a91d4a91a6ef0 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -21,7 +21,7 @@ use risingwave_pb::stream_plan::GroupTopNNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor}; +use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor, PkIndices}; use crate::task::AtomicU64Ref; pub struct GroupTopNExecutorBuilder; @@ -80,6 +80,7 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { state_table: StateTable, watermark_epoch: AtomicU64Ref, group_key_types: Vec, + pk_indices: PkIndices, with_ties: bool, append_only: bool, @@ -120,6 +122,7 @@ impl HashKeyDispatcher for GroupTopNExecutorDispatcherArgs { self.group_by, self.state_table, self.watermark_epoch, + self.pk_indices, )? .boxed()) }; From 7baa27ff9bbce41e40079f97b3d22488a6965fa4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 15 Sep 2023 14:00:14 +0800 Subject: [PATCH 04/11] chore: split full debug info for release build (#12255) Signed-off-by: Bugen Zhao --- Cargo.lock | 82 +++++++++++++++++++++--------------------- Cargo.toml | 8 +++-- ci/scripts/release.sh | 4 +++ docker/Dockerfile | 5 ++- docker/Dockerfile.hdfs | 5 ++- 5 files changed, 58 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd4277aa05617..f337e66f93a92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4071,7 +4071,7 @@ checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "local_stats_alloc" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "workspace-hack", ] @@ -5434,7 +5434,7 @@ dependencies = [ [[package]] name = "pgwire" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "byteorder", @@ -6406,7 +6406,7 @@ dependencies = [ [[package]] name = "risedev" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -6435,7 +6435,7 @@ dependencies = [ [[package]] name = "risedev-config" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -6448,7 +6448,7 @@ dependencies = [ [[package]] name = "risingwave_backup" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -6468,7 +6468,7 @@ dependencies = [ [[package]] name = "risingwave_backup_cmd" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "clap", "madsim-tokio", @@ -6480,7 +6480,7 @@ dependencies = [ [[package]] name = "risingwave_batch" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "assert_matches", @@ -6524,7 +6524,7 @@ dependencies = [ [[package]] name = "risingwave_bench" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "async-trait", "aws-config", @@ -6557,7 +6557,7 @@ dependencies = [ [[package]] name = "risingwave_cmd" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "clap", "madsim-tokio", @@ -6577,7 +6577,7 @@ dependencies = [ [[package]] name = "risingwave_cmd_all" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -6608,7 +6608,7 @@ dependencies = [ [[package]] name = "risingwave_common" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -6700,7 +6700,7 @@ dependencies = [ [[package]] name = "risingwave_common_proc_macro" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "bae", "proc-macro-error", @@ -6712,7 +6712,7 @@ dependencies = [ [[package]] name = "risingwave_common_service" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "async-trait", "futures", @@ -6731,7 +6731,7 @@ dependencies = [ [[package]] name = "risingwave_compaction_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -6757,7 +6757,7 @@ dependencies = [ [[package]] name = "risingwave_compactor" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -6781,7 +6781,7 @@ dependencies = [ [[package]] name = "risingwave_compute" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -6821,7 +6821,7 @@ dependencies = [ [[package]] name = "risingwave_connector" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "apache-avro 0.15.0 (git+https://github.com/risingwavelabs/avro?branch=idx0dev/resolved_schema)", @@ -6901,7 +6901,7 @@ dependencies = [ [[package]] name = "risingwave_ctl" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "bytes", @@ -6935,7 +6935,7 @@ dependencies = [ [[package]] name = "risingwave_e2e_extended_mode_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -6950,7 +6950,7 @@ dependencies = [ [[package]] name = "risingwave_expr" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "aho-corasick", "anyhow", @@ -7007,7 +7007,7 @@ dependencies = [ [[package]] name = "risingwave_frontend" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -7073,7 +7073,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_sdk" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "bytes", "hex", @@ -7087,7 +7087,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "async-trait", "bytes", @@ -7119,7 +7119,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_trace" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "async-trait", "bincode 2.0.0-rc.3", @@ -7176,7 +7176,7 @@ dependencies = [ [[package]] name = "risingwave_meta" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -7241,7 +7241,7 @@ dependencies = [ [[package]] name = "risingwave_object_store" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "async-trait", "await-tree", @@ -7269,7 +7269,7 @@ dependencies = [ [[package]] name = "risingwave_pb" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "enum-as-inner", "fs-err", @@ -7286,7 +7286,7 @@ dependencies = [ [[package]] name = "risingwave_planner_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "expect-test", @@ -7306,7 +7306,7 @@ dependencies = [ [[package]] name = "risingwave_regress_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -7320,7 +7320,7 @@ dependencies = [ [[package]] name = "risingwave_rpc_client" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -7349,7 +7349,7 @@ dependencies = [ [[package]] name = "risingwave_rt" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "await-tree", "chrono", @@ -7423,7 +7423,7 @@ dependencies = [ [[package]] name = "risingwave_source" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "assert_matches", @@ -7446,7 +7446,7 @@ dependencies = [ [[package]] name = "risingwave_sqlparser" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "itertools 0.11.0", "matches", @@ -7473,7 +7473,7 @@ dependencies = [ [[package]] name = "risingwave_sqlsmith" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -7499,7 +7499,7 @@ dependencies = [ [[package]] name = "risingwave_state_cleaning_test" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -7519,7 +7519,7 @@ dependencies = [ [[package]] name = "risingwave_storage" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -7583,7 +7583,7 @@ dependencies = [ [[package]] name = "risingwave_stream" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "anyhow", "assert_matches", @@ -7645,7 +7645,7 @@ dependencies = [ [[package]] name = "risingwave_test_runner" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "fail", "sync-point", @@ -7669,7 +7669,7 @@ dependencies = [ [[package]] name = "risingwave_variables" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "chrono", "workspace-hack", @@ -10010,7 +10010,7 @@ dependencies = [ [[package]] name = "workspace-config" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "log", "openssl-sys", @@ -10021,7 +10021,7 @@ dependencies = [ [[package]] name = "workspace-hack" -version = "1.1.0-alpha" +version = "1.3.0-alpha" dependencies = [ "ahash 0.8.3", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 332bc5f8e7acc..5742b9efc3713 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ members = [ resolver = "2" [workspace.package] -version = "1.1.0-alpha" +version = "1.3.0-alpha" edition = "2021" homepage = "https://github.com/risingwavelabs/risingwave" keywords = ["sql", "database", "streaming"] @@ -177,8 +177,9 @@ redundant_explicit_links = "allow" lto = 'off' [profile.release] -debug = 1 -lto = 'thin' +debug = "full" +split-debuginfo = "packed" +lto = "thin" # The profile used for CI in main branch. # This profile inherits from the release profile, but turns on some checks and assertions for us to @@ -187,6 +188,7 @@ lto = 'thin' inherits = "release" incremental = false debug = "line-tables-only" +split-debuginfo = "off" debug-assertions = true overflow-checks = true diff --git a/ci/scripts/release.sh b/ci/scripts/release.sh index b222e49c08261..9852d48e0ba50 100755 --- a/ci/scripts/release.sh +++ b/ci/scripts/release.sh @@ -78,6 +78,10 @@ if [[ -n "${BUILDKITE_TAG}" ]]; then tar -czvf risingwave-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz risingwave gh release upload "${BUILDKITE_TAG}" risingwave-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz + echo "--- Release upload risingwave debug info" + tar -czvf risingwave-"${BUILDKITE_TAG}"-x86_64-unknown-linux.dwp.tar.gz risingwave.dwp + gh release upload "${BUILDKITE_TAG}" risingwave-"${BUILDKITE_TAG}"-x86_64-unknown-linux.dwp.tar.gz + echo "--- Release upload risectl asset" tar -czvf risectl-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz risectl gh release upload "${BUILDKITE_TAG}" risectl-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz diff --git a/docker/Dockerfile b/docker/Dockerfile index 1e467d4feac2e..d788b4f435a0d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -37,7 +37,9 @@ RUN rustup self update \ RUN cargo fetch && \ cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \ - mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \ + mkdir -p /risingwave/bin && \ + mv /risingwave/target/release/risingwave /risingwave/bin/ && \ + mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \ cp ./target/release/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \ chmod +x /risingwave/bin/jeprof && \ mkdir -p /risingwave/lib && cargo clean @@ -56,6 +58,7 @@ RUN apt-get -y install gdb \ RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave +COPY --from=builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwave.dwp COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node COPY --from=builder /risingwave/ui /risingwave/ui COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof diff --git a/docker/Dockerfile.hdfs b/docker/Dockerfile.hdfs index 7d22f7b516d48..23b6cf802fc4f 100644 --- a/docker/Dockerfile.hdfs +++ b/docker/Dockerfile.hdfs @@ -45,7 +45,9 @@ ENV LD_LIBRARY_PATH ${JAVA_HOME_PATH}/lib/server:${LD_LIBRARY_PATH} RUN cargo fetch && \ cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \ - mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \ + mkdir -p /risingwave/bin && \ + mv /risingwave/target/release/risingwave /risingwave/bin/ && \ + mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \ cp ./target/release/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \ chmod +x /risingwave/bin/jeprof && \ mkdir -p /risingwave/lib && cargo clean @@ -61,6 +63,7 @@ FROM image-base as risingwave LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave +COPY --from=builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwave.dwp COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node COPY --from=builder /risingwave/ui /risingwave/ui COPY --from=builder /risingwave/hdfs_env.sh /risingwave/hdfs_env.sh From 59bb645b27d6345abddfebef5e3095c56b74ffe7 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 15 Sep 2023 14:54:54 +0800 Subject: [PATCH 05/11] chore: add platforms to hakari (#12333) Signed-off-by: Runji Wang Co-authored-by: Runji Wang --- .config/hakari.toml | 9 +- Cargo.lock | 14 ++++ Makefile.toml | 13 ++- src/compute/Cargo.toml | 4 +- src/test_runner/Cargo.toml | 2 + src/tests/simulation/Cargo.toml | 4 +- src/workspace-hack/Cargo.toml | 144 ++++++++++++++++++++++++++++++++ 7 files changed, 182 insertions(+), 8 deletions(-) diff --git a/.config/hakari.toml b/.config/hakari.toml index e998f5fea2fbe..e3e58e80cbf16 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -15,9 +15,10 @@ resolver = "2" # Add triples corresponding to platforms commonly used by developers here. # https://doc.rust-lang.org/rustc/platform-support.html platforms = [ - # "x86_64-unknown-linux-gnu", - # "x86_64-apple-darwin", - # "x86_64-pc-windows-msvc", + "x86_64-unknown-linux-gnu", + "aarch64-unknown-linux-gnu", + "x86_64-apple-darwin", + "aarch64-apple-darwin", ] # Write out exact versions rather than a semver range. (Defaults to false.) @@ -35,6 +36,4 @@ third-party = [ { name = "criterion" }, { name = "console" }, { name = "similar" }, - # FYI: https://github.com/risingwavelabs/risingwave/issues/12315 - { name = "tikv-jemalloc-sys", git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }, ] diff --git a/Cargo.lock b/Cargo.lock index f337e66f93a92..9e73822d5a77d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10029,6 +10029,7 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-smithy-client", + "axum", "base64 0.21.3", "bitflags 2.4.0", "byteorder", @@ -10070,12 +10071,18 @@ dependencies = [ "log", "madsim-rdkafka", "madsim-tokio", + "memchr", + "mime_guess", + "miniz_oxide", "mio", "multimap", "nom", "num-bigint", "num-integer", "num-traits", + "once_cell", + "openssl-sys", + "opentelemetry", "opentelemetry_api", "opentelemetry_sdk", "parking_lot 0.12.1", @@ -10090,12 +10097,15 @@ dependencies = [ "rand", "rand_chacha", "rand_core 0.6.4", + "rdkafka-sys", "regex", "regex-automata 0.3.8", "regex-syntax 0.7.5", "reqwest", "ring", "rust_decimal", + "rustix 0.38.11", + "rustls 0.21.7", "scopeguard", "serde", "serde_json", @@ -10104,6 +10114,8 @@ dependencies = [ "subtle", "syn 1.0.109", "syn 2.0.33", + "tikv-jemalloc-sys", + "tikv-jemallocator", "time", "time-macros", "tinyvec", @@ -10115,6 +10127,7 @@ dependencies = [ "toml_edit", "tonic", "tower", + "tower-http", "tracing", "tracing-core", "tracing-subscriber", @@ -10123,6 +10136,7 @@ dependencies = [ "url", "uuid", "zeroize", + "zstd-sys", ] [[package]] diff --git a/Makefile.toml b/Makefile.toml index 1583f7185e179..ac237fac06628 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -946,7 +946,18 @@ set -e cargo check \ --config "target.'cfg(all())'.rustflags = ['--cfg=madsim']" \ - -p risingwave_simulation --all-targets "$@" + -p risingwave_batch \ + -p risingwave_common \ + -p risingwave_compute \ + -p risingwave_connector \ + -p risingwave_frontend \ + -p risingwave_meta \ + -p risingwave_object_store \ + -p risingwave_source \ + -p risingwave_storage \ + -p risingwave_stream \ + -p pgwire \ + -p risingwave_simulation --tests "$@" """ [tasks.sslt] diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 24d12151c04b7..7fb7dbd1ab2bd 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -39,7 +39,6 @@ risingwave_storage = { workspace = true } risingwave_stream = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" -tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -54,6 +53,9 @@ tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" +[target.'cfg(unix)'.dependencies] +tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/test_runner/Cargo.toml b/src/test_runner/Cargo.toml index f5ed8b05dc03a..3b9819bd45dad 100644 --- a/src/test_runner/Cargo.toml +++ b/src/test_runner/Cargo.toml @@ -17,6 +17,8 @@ normal = ["workspace-hack"] [dependencies] fail = "0.5" sync-point = { path = "../utils/sync-point" } + +[target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } [lints] diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 82992b8b04c2a..ab9201e6ddd3f 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -46,11 +46,13 @@ serde_derive = "1.0.188" serde_json = "1.0.107" sqllogictest = "0.15.3" tempfile = "3" -tikv-jemallocator = { workspace = true } tokio = { version = "0.2.23", package = "madsim-tokio" } tokio-postgres = "0.7" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +[target.'cfg(unix)'.dependencies] +tikv-jemallocator = { workspace = true } + [lints] workspace = true diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index a539527690fce..63cf30bf854df 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -215,4 +215,148 @@ url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } zeroize = { version = "1", features = ["zeroize_derive"] } +[target.x86_64-unknown-linux-gnu.dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.x86_64-unknown-linux-gnu.build-dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.aarch64-unknown-linux-gnu.dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.aarch64-unknown-linux-gnu.build-dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.x86_64-apple-darwin.dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.x86_64-apple-darwin.build-dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.aarch64-apple-darwin.dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + +[target.aarch64-apple-darwin.build-dependencies] +axum = { version = "0.6" } +memchr = { version = "2" } +mime_guess = { version = "2" } +miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } +once_cell = { version = "1", features = ["unstable"] } +openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } +opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } +opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } +rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } +rustix = { version = "0.38", features = ["fs", "termios"] } +rustls = { version = "0.21", features = ["dangerous_configuration"] } +serde_json = { version = "1", default-features = false, features = ["raw_value"] } +tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } +tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } +zstd-sys = { version = "2", features = ["std"] } + ### END HAKARI SECTION From c0060b2ecbcbe16ce518f49237b2f380d1e41e40 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 15 Sep 2023 15:32:14 +0800 Subject: [PATCH 06/11] feat(meta): add hummock config relevant tables to rw_catalog (#12337) --- proto/hummock.proto | 15 ++++ .../src/catalog/system_catalog/mod.rs | 2 + .../catalog/system_catalog/rw_catalog/mod.rs | 4 + .../rw_hummock_compaction_group_configs.rs | 72 ++++++++++++++++++ .../rw_catalog/rw_hummock_meta_configs.rs | 50 +++++++++++++ src/frontend/src/meta_client.rs | 21 +++++- src/frontend/src/test_utils.rs | 15 +++- src/meta/src/backup_restore/backup_manager.rs | 7 +- src/meta/src/hummock/vacuum.rs | 3 +- src/meta/src/rpc/service/hummock_service.rs | 73 ++++++++++++++++++- src/rpc_client/src/meta_client.rs | 15 ++++ 11 files changed, 268 insertions(+), 9 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index fd16e32457ec5..db99cbe5f8509 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -633,6 +633,19 @@ message ListBranchedObjectResponse { repeated BranchedObject branched_objects = 1; } +message ListActiveWriteLimitRequest {} + +message ListActiveWriteLimitResponse { + // < compaction group id, write limit info > + map write_limits = 1; +} + +message ListHummockMetaConfigRequest {} + +message ListHummockMetaConfigResponse { + map configs = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -664,6 +677,8 @@ service HummockManagerService { rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse); rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse); rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse); + rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse); + rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse); } message CompactionConfig { diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 59685698ddc99..185f8017311c2 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -409,6 +409,8 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_HUMMOCK_CHECKPOINT_VERSION), read_hummock_checkpoint_version await }, { BuiltinCatalog::Table(&RW_HUMMOCK_VERSION_DELTAS), read_hummock_version_deltas await }, { BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await}, } #[cfg(test)] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 58beecbb528a6..9f89c9eed5e81 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -20,6 +20,8 @@ mod rw_ddl_progress; mod rw_fragments; mod rw_functions; mod rw_hummock_branched_objects; +mod rw_hummock_compaction_group_configs; +mod rw_hummock_meta_configs; mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; mod rw_hummock_version; @@ -51,6 +53,8 @@ pub use rw_ddl_progress::*; pub use rw_fragments::*; pub use rw_functions::*; pub use rw_hummock_branched_objects::*; +pub use rw_hummock_compaction_group_configs::*; +pub use rw_hummock_meta_configs::*; pub use rw_hummock_pinned_snapshots::*; pub use rw_hummock_pinned_versions::*; pub use rw_hummock_version::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs new file mode 100644 index 0000000000000..758d639388f7e --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs @@ -0,0 +1,72 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_COMPACTION_GROUP_CONFIGS: BuiltinTable = BuiltinTable { + name: "rw_hummock_compaction_group_configs", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "id"), + (DataType::Int64, "parent_id"), + (DataType::Jsonb, "member_tables"), + (DataType::Jsonb, "compaction_config"), + (DataType::Jsonb, "active_write_limit"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_compaction_group_configs(&self) -> Result> { + let info = self + .meta_client + .list_hummock_compaction_group_configs() + .await?; + let mut write_limits = self.meta_client.list_hummock_active_write_limits().await?; + let mut rows = info + .into_iter() + .map(|i| { + let active_write_limit = write_limits + .remove(&i.id) + .map(|w| ScalarImpl::Jsonb(json!(w).into())); + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(i.id as _)), + Some(ScalarImpl::Int64(i.parent_id as _)), + Some(ScalarImpl::Jsonb(json!(i.member_table_ids).into())), + Some(ScalarImpl::Jsonb(json!(i.compaction_config).into())), + active_write_limit, + ]) + }) + .collect_vec(); + // As compaction group configs and active write limits are fetched via two RPCs, it's possible there's inconsistency. + // Just leave unknown field blank. + rows.extend(write_limits.into_iter().map(|(cg, w)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(cg as _)), + None, + None, + None, + Some(ScalarImpl::Jsonb(json!(w).into())), + ]) + })); + Ok(rows) + } +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs new file mode 100644 index 0000000000000..e28dc0a926c22 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_META_CONFIGS: BuiltinTable = BuiltinTable { + name: "rw_hummock_meta_configs", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Varchar, "config_name"), + (DataType::Varchar, "config_value"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_meta_configs(&self) -> Result> { + let configs = self + .meta_client + .list_hummock_meta_configs() + .await? + .into_iter() + .sorted() + .map(|(k, v)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(k.into())), + Some(ScalarImpl::Utf8(v.into())), + ]) + }) + .collect_vec(); + Ok(configs) + } +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index c45cf12377269..ae90c2e345f9f 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -18,8 +18,9 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::DdlProgress; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -86,6 +87,12 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_version_deltas(&self) -> Result>; async fn list_branched_objects(&self) -> Result>; + + async fn list_hummock_compaction_group_configs(&self) -> Result>; + + async fn list_hummock_active_write_limits(&self) -> Result>; + + async fn list_hummock_meta_configs(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -214,4 +221,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_branched_objects(&self) -> Result> { self.0.list_branched_object().await } + + async fn list_hummock_compaction_group_configs(&self) -> Result> { + self.0.risectl_list_compaction_group().await + } + + async fn list_hummock_active_write_limits(&self) -> Result> { + self.0.list_active_write_limit().await + } + + async fn list_hummock_meta_configs(&self) -> Result> { + self.0.list_hummock_meta_config().await + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index abdfa90c064b2..20eb252fc5053 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -35,8 +35,9 @@ use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -850,6 +851,18 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_branched_objects(&self) -> RpcResult> { unimplemented!() } + + async fn list_hummock_compaction_group_configs(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_hummock_active_write_limits(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_hummock_meta_configs(&self) -> RpcResult> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 1eea48307cb33..c280572c796d4 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; use arc_swap::ArcSwap; -use itertools::Itertools; use risingwave_backup::error::BackupError; use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; @@ -307,7 +307,7 @@ impl BackupManager { } /// List all `SSTables` required by backups. - pub fn list_pinned_ssts(&self) -> Vec { + pub fn list_pinned_ssts(&self) -> HashSet { self.backup_store .load() .0 @@ -315,8 +315,7 @@ impl BackupManager { .snapshot_metadata .iter() .flat_map(|s| s.ssts.clone()) - .dedup() - .collect_vec() + .collect() } pub fn manifest(&self) -> Arc { diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 31f4651d6fdfd..992deb5e636ce 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -164,8 +164,7 @@ impl VacuumManager { &self, objects_to_delete: &mut Vec, ) -> MetaResult<()> { - let reject: HashSet = - self.backup_manager.list_pinned_ssts().into_iter().collect(); + let reject = self.backup_manager.list_pinned_ssts(); // Ack these SSTs immediately, because they tend to be pinned for long time. // They will be GCed during full GC when they are no longer pinned. let to_ack = objects_to_delete diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/src/rpc/service/hummock_service.rs index a8419da6e2077..161e92ddad113 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/src/rpc/service/hummock_service.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use futures::StreamExt; @@ -48,6 +48,18 @@ impl HummockServiceImpl { } } +macro_rules! fields_to_kvs { + ($struct:ident, $($field:ident),*) => { + { + let mut kvs = HashMap::default(); + $( + kvs.insert(stringify!($field).to_string(), $struct.$field.to_string()); + )* + kvs + } + } +} + #[async_trait::async_trait] impl HummockManagerService for HummockServiceImpl { type SubscribeCompactionEventStream = RwReceiverStream; @@ -536,4 +548,63 @@ impl HummockManagerService for HummockServiceImpl { branched_objects, })) } + + async fn list_active_write_limit( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListActiveWriteLimitResponse { + write_limits: self.hummock_manager.write_limits().await, + })) + } + + async fn list_hummock_meta_config( + &self, + _request: Request, + ) -> Result, Status> { + let opt = &self.hummock_manager.env.opts; + let configs = fields_to_kvs!( + opt, + vacuum_interval_sec, + vacuum_spin_interval_ms, + hummock_version_checkpoint_interval_sec, + min_delta_log_num_for_hummock_version_checkpoint, + min_sst_retention_time_sec, + full_gc_interval_sec, + collect_gc_watermark_spin_interval_sec, + periodic_compaction_interval_sec, + periodic_space_reclaim_compaction_interval_sec, + periodic_ttl_reclaim_compaction_interval_sec, + periodic_tombstone_reclaim_compaction_interval_sec, + periodic_split_compact_group_interval_sec, + split_group_size_limit, + min_table_split_size, + do_not_config_object_storage_lifecycle, + partition_vnode_count, + table_write_throughput_threshold, + min_table_split_write_throughput, + compaction_task_max_heartbeat_interval_secs + ); + Ok(Response::new(ListHummockMetaConfigResponse { configs })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + #[test] + fn test_fields_to_kvs() { + struct S { + foo: u64, + bar: String, + } + let s = S { + foo: 15, + bar: "foobar".to_string(), + }; + let kvs: HashMap = fields_to_kvs!(s, foo, bar); + assert_eq!(kvs.len(), 2); + assert_eq!(kvs.get("foo").unwrap(), "15"); + assert_eq!(kvs.get("bar").unwrap(), "foobar"); + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e4b837ba517f0..8ebbded3dd083 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -53,6 +53,7 @@ use risingwave_pb::ddl_service::*; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::subscribe_compaction_event_request::Register; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::*; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; @@ -1050,6 +1051,18 @@ impl MetaClient { Ok(resp.branched_objects) } + pub async fn list_active_write_limit(&self) -> Result> { + let req = ListActiveWriteLimitRequest {}; + let resp = self.inner.list_active_write_limit(req).await?; + Ok(resp.write_limits) + } + + pub async fn list_hummock_meta_config(&self) -> Result> { + let req = ListHummockMetaConfigRequest {}; + let resp = self.inner.list_hummock_meta_config(req).await?; + Ok(resp.configs) + } + pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> { let _resp = self .inner @@ -1716,6 +1729,8 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse } ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest, Streaming } ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse } + ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse } + ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } From bf5b14ecb6904fa09bb449f5e8358cddfd7aab21 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 15 Sep 2023 16:06:17 +0800 Subject: [PATCH 07/11] chore: lift decoding message size limit for ddl client (#12340) --- src/rpc_client/src/meta_client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8ebbded3dd083..f3d9f8ad12fd2 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1288,7 +1288,8 @@ impl GrpcMetaClientCore { let cluster_client = ClusterServiceClient::new(channel.clone()); let meta_member_client = MetaMemberClient::new(channel.clone()); let heartbeat_client = HeartbeatServiceClient::new(channel.clone()); - let ddl_client = DdlServiceClient::new(channel.clone()); + let ddl_client = + DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX); let hummock_client = HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX); let notification_client = From 8a36ca33c116752be189f2da8f1dfbaf9934247f Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Fri, 15 Sep 2023 16:11:03 +0800 Subject: [PATCH 08/11] feat(meta): Add `creating_status` field for stream jobs (#12330) --- proto/catalog.proto | 12 ++++++++++++ src/connector/src/sink/catalog/mod.rs | 3 ++- src/frontend/src/catalog/index_catalog.rs | 3 ++- src/frontend/src/catalog/table_catalog.rs | 6 ++++-- src/frontend/src/handler/create_index.rs | 3 ++- src/storage/src/filter_key_extractor.rs | 3 ++- src/tests/compaction_test/src/delete_range_runner.rs | 3 ++- 7 files changed, 26 insertions(+), 7 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index aa09274a0d818..07aff3baee22f 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -32,6 +32,13 @@ enum SchemaRegistryNameStrategy { TOPIC_RECORD_NAME_STRATEGY = 2; } +enum StreamJobStatus { + // Prefixed by `STREAM_JOB_STATUS` due to protobuf namespacing rules. + STREAM_JOB_STATUS_UNSPECIFIED = 0; + CREATING = 1; + CREATED = 2; +} + message StreamSourceInfo { // deprecated plan_common.RowFormatType row_format = 1; @@ -116,6 +123,7 @@ message Sink { optional uint64 created_at_epoch = 16; string db_name = 17; string sink_from_name = 18; + StreamJobStatus stream_job_status = 19; } message Connection { @@ -157,6 +165,7 @@ message Index { optional uint64 initialized_at_epoch = 10; optional uint64 created_at_epoch = 11; + StreamJobStatus stream_job_status = 12; } message Function { @@ -250,6 +259,9 @@ message Table { // In older versions we can just initialize without it. bool cleaned_by_watermark = 30; + // Used to filter created / creating tables in meta. + StreamJobStatus stream_job_status = 31; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 16b53af17499a..fd307b05e115e 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::{PbSink, PbSinkType}; +use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus}; #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)] pub struct SinkId { @@ -191,6 +191,7 @@ impl SinkCatalog { created_at_epoch: self.created_at_epoch.map(|e| e.0), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), + stream_job_status: PbStreamJobStatus::Creating.into(), } } diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index caf0557b2fd09..ca4b4036332d3 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::IndexId; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::PbIndex; +use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus}; use super::ColumnId; use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog}; @@ -184,6 +184,7 @@ impl IndexCatalog { original_columns: self.original_columns.iter().map(Into::into).collect_vec(), initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), created_at_epoch: self.created_at_epoch.map(|e| e.0), + stream_job_status: PbStreamJobStatus::Creating.into(), } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 6c83df13e80be..2b8ef546c9be9 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; -use risingwave_pb::catalog::PbTable; +use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -401,6 +401,7 @@ impl TableCatalog { initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0), created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, + stream_job_status: PbStreamJobStatus::Creating.into(), } } @@ -542,7 +543,7 @@ mod tests { use risingwave_common::test_prelude::*; use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; - use risingwave_pb::catalog::PbTable; + use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; use super::*; @@ -605,6 +606,7 @@ mod tests { cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, + stream_job_status: PbStreamJobStatus::Creating.into(), } .into(); diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index ad4512aca354a..a5a002d3b3d79 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -21,7 +21,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{IndexId, TableDesc, TableId}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_pb::catalog::{PbIndex, PbTable}; +use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::{Action, Object}; use risingwave_sqlparser::ast; @@ -242,6 +242,7 @@ pub(crate) fn gen_create_index_plan( original_columns, initialized_at_epoch: None, created_at_epoch: None, + stream_job_status: PbStreamJobStatus::Creating.into(), }; let plan: PlanRef = materialize.into(); diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index 47ce552300e22..d861c8256d961 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -448,7 +448,7 @@ mod tests { use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; use risingwave_pb::catalog::table::TableType; - use risingwave_pb::catalog::PbTable; + use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use risingwave_pb::plan_common::PbColumnCatalog; @@ -549,6 +549,7 @@ mod tests { cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, + stream_job_status: PbStreamJobStatus::Created.into(), } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 683cff8fac45d..258fb62d3b740 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -36,7 +36,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; -use risingwave_pb::catalog::PbTable; +use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; @@ -150,6 +150,7 @@ async fn compaction_test( cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, + stream_job_status: PbStreamJobStatus::Created.into(), }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2; From c443197f2b0c32363a494bc9320073ccc9969c4c Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 15 Sep 2023 16:22:13 +0800 Subject: [PATCH 09/11] feat(optimizer): support correlated column in order by (#12341) --- .../tests/testdata/input/subquery_expr.yaml | 13 +++++++ .../tests/testdata/output/subquery_expr.yaml | 38 +++++++++++++++++++ src/frontend/src/binder/query.rs | 14 +++++-- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml index a51035b4750c0..255a87f84099d 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml @@ -116,3 +116,16 @@ select x from t1 where y not in (select y from t2); expected_outputs: - logical_plan +- sql: | + create table t1 (a int); + create table t2 (b int); + SELECT * + FROM t1 + WHERE EXISTS + (SELECT 1 + FROM t2 + GROUP BY a + ORDER BY a DESC LIMIT 90); + expected_outputs: + - logical_plan + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml index bc1da0f48dfb2..1383c156a18f5 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml @@ -234,3 +234,41 @@ ├─LogicalScan { table: t1, columns: [t1.x, t1.y, t1._row_id] } └─LogicalProject { exprs: [t2.y] } └─LogicalScan { table: t2, columns: [t2.x, t2.y, t2._row_id] } +- sql: | + create table t1 (a int); + create table t2 (b int); + SELECT * + FROM t1 + WHERE EXISTS + (SELECT 1 + FROM t2 + GROUP BY a + ORDER BY a DESC LIMIT 90); + logical_plan: |- + LogicalProject { exprs: [t1.a] } + └─LogicalApply { type: LeftSemi, on: true, correlated_id: 1 } + ├─LogicalScan { table: t1, columns: [t1.a, t1._row_id] } + └─LogicalProject { exprs: [1:Int32] } + └─LogicalTopN { order: [$expr2 DESC], limit: 90, offset: 0 } + └─LogicalProject { exprs: [1:Int32, CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr2] } + └─LogicalAgg { group_key: [$expr1], aggs: [] } + └─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr1] } + └─LogicalScan { table: t2, columns: [t2.b, t2._row_id] } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t1.a, output: all } + ├─BatchExchange { order: [], dist: HashShard(t1.a) } + │ └─BatchScan { table: t1, columns: [t1.a], distribution: SomeShard } + └─BatchProject { exprs: [t1.a] } + └─BatchGroupTopN { order: [t1.a DESC], limit: 90, offset: 0, group_key: [t1.a] } + └─BatchExchange { order: [], dist: HashShard(t1.a) } + └─BatchProject { exprs: [t1.a, t1.a] } + └─BatchHashAgg { group_key: [t1.a], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t1.a) } + └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchHashAgg { group_key: [t1.a], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t1.a) } + │ └─BatchScan { table: t1, columns: [t1.a], distribution: SomeShard } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t2, columns: [], distribution: SomeShard } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 5f85031c9833b..bb1c30d22b3d2 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -92,9 +92,17 @@ impl BoundQuery { depth: Depth, correlated_id: CorrelatedId, ) -> Vec { - // TODO: collect `correlated_input_ref` in `extra_order_exprs`. - self.body - .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id) + let mut correlated_indices = vec![]; + + correlated_indices.extend( + self.body + .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), + ); + + correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| { + expr.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id) + })); + correlated_indices } /// Simple `VALUES` without other clauses. From 467ba4b00c2b30d2228c9ee1638fbe80fb53fa12 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Fri, 15 Sep 2023 16:28:13 +0800 Subject: [PATCH 10/11] fix: stream backfill executor use correct schema (#12314) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- e2e_test/streaming/bug_fixes/issue_12299.slt | 31 ++++++++++++++++++++ src/stream/src/from_proto/chain.rs | 25 +++++++++------- 2 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 e2e_test/streaming/bug_fixes/issue_12299.slt diff --git a/e2e_test/streaming/bug_fixes/issue_12299.slt b/e2e_test/streaming/bug_fixes/issue_12299.slt new file mode 100644 index 0000000000000..7be47038f15cf --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12299.slt @@ -0,0 +1,31 @@ +# https://github.com/risingwavelabs/risingwave/issues/12299 +# TL;DR When upstream's stream key is not pk and the stream scan does not contain whole pk. + +statement ok +create table t1( + id bigint primary key, + i bigint +); + +statement ok +create materialized view mv1 as select id, i from t1 order by id, i; + +statement ok +insert into t1 values(1, 1); + +statement ok +create materialized view mv2 as select id from mv1; + +query I +select * from mv2; +---- +1 + +statement ok +drop materialized view mv2; + +statement ok +drop materialized view mv1; + +statement ok +drop table t1; \ No newline at end of file diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index d1a971a5cbb4a..667772fcfdd60 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{ChainNode, ChainType}; @@ -40,28 +40,31 @@ impl ExecutorBuilder for ChainExecutorBuilder { stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [mview, snapshot]: [_; 2] = params.input.try_into().unwrap(); - // For reporting the progress. let progress = stream .context .register_create_mview_progress(params.actor_context.id); - // The batch query executor scans on a mapped adhoc mview table, thus we should directly use - // its schema. - let schema = snapshot.schema().clone(); - let output_indices = node .output_indices .iter() .map(|&i| i as usize) .collect_vec(); - // For `Chain`s other than `Backfill`, there should be no extra mapping required. We can - // directly output the columns received from the upstream or snapshot. - if !matches!(node.chain_type(), ChainType::Backfill) { - let all_indices = (0..schema.len()).collect_vec(); + let schema = if matches!(node.chain_type(), ChainType::Backfill) { + Schema::new( + output_indices + .iter() + .map(|i| snapshot.schema().fields()[*i].clone()) + .collect_vec(), + ) + } else { + // For `Chain`s other than `Backfill`, there should be no extra mapping required. We can + // directly output the columns received from the upstream or snapshot. + let all_indices = (0..snapshot.schema().len()).collect_vec(); assert_eq!(output_indices, all_indices); - } + snapshot.schema().clone() + }; let executor = match node.chain_type() { ChainType::Chain | ChainType::UpstreamOnly => { From 00321457dafc2c07590d7213ee076f6bf07ff073 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 15 Sep 2023 16:57:25 +0800 Subject: [PATCH 11/11] refactor(expr): support variadic function in `#[function]` macro (#12178) Signed-off-by: Runji Wang --- e2e_test/batch/functions/format.slt.part | 23 ++ src/common/src/array/data_chunk.rs | 1 + src/common/src/lib.rs | 1 - src/expr/macro/src/gen.rs | 110 +++++--- src/expr/macro/src/lib.rs | 27 +- src/expr/macro/src/types.rs | 7 +- src/expr/src/error.rs | 3 + src/expr/src/expr/build.rs | 5 - src/expr/src/expr/expr_concat_ws.rs | 250 ------------------ src/expr/src/expr/expr_nested_construct.rs | 153 ----------- src/expr/src/expr/mod.rs | 2 - src/expr/src/sig/func.rs | 22 +- src/expr/src/vector_op/array.rs | 28 ++ src/expr/src/vector_op/concat_ws.rs | 70 +++++ .../src => expr/src/vector_op}/format.rs | 102 ++++++- src/expr/src/vector_op/mod.rs | 3 + .../tests/testdata/input/format.yaml | 8 +- .../tests/testdata/output/format.yaml | 30 +-- src/frontend/src/binder/expr/function.rs | 72 +---- src/frontend/src/expr/type_inference/func.rs | 16 ++ 20 files changed, 363 insertions(+), 570 deletions(-) delete mode 100644 src/expr/src/expr/expr_concat_ws.rs delete mode 100644 src/expr/src/expr/expr_nested_construct.rs create mode 100644 src/expr/src/vector_op/array.rs create mode 100644 src/expr/src/vector_op/concat_ws.rs rename src/{common/src => expr/src/vector_op}/format.rs (50%) diff --git a/e2e_test/batch/functions/format.slt.part b/e2e_test/batch/functions/format.slt.part index 92b4fc1553a65..ab6090737e304 100644 --- a/e2e_test/batch/functions/format.slt.part +++ b/e2e_test/batch/functions/format.slt.part @@ -7,3 +7,26 @@ query T SELECT format('Testing %s, %s, %s, %%', 'one', 'two', 'three'); ---- Testing one, two, three, % + +query T +SELECT format('%s %s', a, b) from (values + ('Hello', 'World'), + ('Rising', 'Wave') +) as t(a, b); +---- +Hello World +Rising Wave + +query T +SELECT format(f, a, b) from (values + ('%s %s', 'Hello', 'World'), + ('%s%s', 'Hello', null), + (null, 'Hello', 'World') +) as t(f, a, b); +---- +Hello World +Hello +NULL + +query error too few arguments for format() +SELECT format('%s %s', 'Hello'); diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index f335b56a60edb..cc4bef12cccff 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -779,6 +779,7 @@ impl DataChunkTestExt for DataChunk { "." => None, "t" => Some(true.into()), "f" => Some(false.into()), + "(empty)" => Some("".into()), _ => Some(ScalarImpl::from_text(val_str.as_bytes(), ty).unwrap()), }; builder.append(datum); diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 5f7e9fd476da5..da58e53b8c52d 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -72,7 +72,6 @@ pub mod system_param; pub mod telemetry; pub mod transaction; -pub mod format; pub mod metrics; pub mod test_utils; pub mod types; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 9e8b4d9d3c1b2..56464f2bf2809 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -59,10 +59,14 @@ impl FunctionAttr { return self.generate_table_function_descriptor(user_fn, build_fn); } let name = self.name.clone(); - let mut args = Vec::with_capacity(self.args.len()); - for ty in &self.args { - args.push(data_type_name(ty)); + let variadic = matches!(self.args.last(), Some(t) if t == "..."); + let args = match variadic { + true => &self.args[..self.args.len() - 1], + false => &self.args[..], } + .iter() + .map(|ty| data_type_name(ty)) + .collect_vec(); let ret = data_type_name(&self.ret); let pb_type = format_ident!("{}", utils::to_camel_case(&name)); @@ -82,6 +86,7 @@ impl FunctionAttr { unsafe { crate::sig::func::_register(#descriptor_type { func: risingwave_pb::expr::expr_node::Type::#pb_type, inputs_type: &[#(#args),*], + variadic: #variadic, ret_type: #ret, build: #build_fn, deprecated: #deprecated, @@ -99,7 +104,8 @@ impl FunctionAttr { user_fn: &UserFunctionAttr, optimize_const: bool, ) -> Result { - let num_args = self.args.len(); + let variadic = matches!(self.args.last(), Some(t) if t == "..."); + let num_args = self.args.len() - if variadic { 1 } else { 0 }; let fn_name = format_ident!("{}", user_fn.name); let struct_name = match optimize_const { true => format_ident!("{}OptimizeConst", utils::to_camel_case(&self.ident_name())), @@ -148,8 +154,6 @@ impl FunctionAttr { let inputs = idents("i", &children_indices); let prebuilt_inputs = idents("i", &prebuilt_indices); let non_prebuilt_inputs = idents("i", &non_prebuilt_indices); - let all_child = idents("child", &(0..num_args).collect_vec()); - let child = idents("child", &children_indices); let array_refs = idents("array", &children_indices); let arrays = idents("a", &children_indices); let datums = idents("v", &children_indices); @@ -210,15 +214,41 @@ impl FunctionAttr { } else { quote! { () } }; - let generic = if self.ret == "boolean" && user_fn.generic == 3 { + + // ensure the number of children matches the number of arguments + let check_children = match variadic { + true => quote! { crate::ensure!(children.len() >= #num_args); }, + false => quote! { crate::ensure!(children.len() == #num_args); }, + }; + + // evaluate variadic arguments in `eval` + let eval_variadic = variadic.then(|| { + quote! { + let mut columns = Vec::with_capacity(self.children.len() - #num_args); + for child in &self.children[#num_args..] { + columns.push(child.eval_checked(input).await?); + } + let variadic_input = DataChunk::new(columns, input.vis().clone()); + } + }); + // evaluate variadic arguments in `eval_row` + let eval_row_variadic = variadic.then(|| { + quote! { + let mut row = Vec::with_capacity(self.children.len() - #num_args); + for child in &self.children[#num_args..] { + row.push(child.eval_row(input).await?); + } + let variadic_row = OwnedRow::new(row); + } + }); + + let generic = (self.ret == "boolean" && user_fn.generic == 3).then(|| { // XXX: for generic compare functions, we need to specify the compatible type let compatible_type = types::ref_type(types::min_compatible_type(&self.args)) .parse::() .unwrap(); quote! { ::<_, _, #compatible_type> } - } else { - quote! {} - }; + }); let prebuilt_arg = match (&self.prebuild, optimize_const) { // use the prebuilt argument (Some(_), true) => quote! { &self.prebuilt_arg, }, @@ -227,18 +257,21 @@ impl FunctionAttr { // no prebuilt argument (None, _) => quote! {}, }; - let context = match user_fn.context { - true => quote! { &self.context, }, - false => quote! {}, - }; - let writer = match user_fn.write { - true => quote! { &mut writer, }, - false => quote! {}, - }; + let variadic_args = variadic.then(|| quote! { variadic_row, }); + let context = user_fn.context.then(|| quote! { &self.context, }); + let writer = user_fn.write.then(|| quote! { &mut writer, }); let await_ = user_fn.async_.then(|| quote! { .await }); // call the user defined function // inputs: [ Option ] - let mut output = quote! { #fn_name #generic(#(#non_prebuilt_inputs,)* #prebuilt_arg #context #writer) #await_ }; + let mut output = quote! { #fn_name #generic( + #(#non_prebuilt_inputs,)* + #prebuilt_arg + #variadic_args + #context + #writer + ) #await_ }; + // handle error if the function returns `Result` + // wrap a `Some` if the function doesn't return `Option` output = match user_fn.return_type_kind { // XXX: we don't support void type yet. return null::int for now. _ if self.ret == "void" => quote! { { #output; Option::::None } }, @@ -257,7 +290,7 @@ impl FunctionAttr { } }; }; - // output: Option + // now the `output` is: Option let append_output = match user_fn.write { true => quote! {{ let mut writer = builder.writer().begin(); @@ -292,13 +325,20 @@ impl FunctionAttr { }; // the main body in `eval` let eval = if let Some(batch_fn) = &self.batch_fn { + assert!( + !variadic, + "customized batch function is not supported for variadic functions" + ); // user defined batch function let fn_name = format_ident!("{}", batch_fn); quote! { let c = #fn_name(#(#arrays),*); Ok(Arc::new(c.into())) } - } else if (types::is_primitive(&self.ret) || self.ret == "boolean") && user_fn.is_pure() { + } else if (types::is_primitive(&self.ret) || self.ret == "boolean") + && user_fn.is_pure() + && !variadic + { // SIMD optimization for primitive types match self.args.len() { 0 => quote! { @@ -330,10 +370,15 @@ impl FunctionAttr { } } else { // no optimization - let array_zip = match num_args { + let array_zip = match children_indices.len() { 0 => quote! { std::iter::repeat(()).take(input.capacity()) }, _ => quote! { multizip((#(#arrays.iter(),)*)) }, }; + let let_variadic = variadic.then(|| { + quote! { + let variadic_row = variadic_input.row_at_unchecked_vis(i); + } + }); quote! { let mut builder = #builder_type::with_type(input.capacity(), self.context.return_type.clone()); @@ -341,16 +386,18 @@ impl FunctionAttr { Vis::Bitmap(vis) => { // allow using `zip` for performance #[allow(clippy::disallowed_methods)] - for ((#(#inputs,)*), visible) in #array_zip.zip(vis.iter()) { + for (i, ((#(#inputs,)*), visible)) in #array_zip.zip(vis.iter()).enumerate() { if !visible { builder.append_null(); continue; } + #let_variadic #append_output } } Vis::Compact(_) => { - for (#(#inputs,)*) in #array_zip { + for (i, (#(#inputs,)*)) in #array_zip.enumerate() { + #let_variadic #append_output } } @@ -374,19 +421,17 @@ impl FunctionAttr { use crate::expr::{Context, BoxedExpression}; use crate::Result; - crate::ensure!(children.len() == #num_args); + #check_children let prebuilt_arg = #prebuild_const; let context = Context { return_type, arg_types: children.iter().map(|c| c.return_type()).collect(), }; - let mut iter = children.into_iter(); - #(let #all_child = iter.next().unwrap();)* #[derive(Debug)] struct #struct_name { context: Context, - #(#child: BoxedExpression,)* + children: Vec, prebuilt_arg: #prebuilt_arg_type, } #[async_trait::async_trait] @@ -395,25 +440,26 @@ impl FunctionAttr { self.context.return_type.clone() } async fn eval(&self, input: &DataChunk) -> Result { - // evaluate children and downcast arrays #( - let #array_refs = self.#child.eval_checked(input).await?; + let #array_refs = self.children[#children_indices].eval_checked(input).await?; let #arrays: &#arg_arrays = #array_refs.as_ref().into(); )* + #eval_variadic #eval } async fn eval_row(&self, input: &OwnedRow) -> Result { #( - let #datums = self.#child.eval_row(input).await?; + let #datums = self.children[#children_indices].eval_row(input).await?; let #inputs: Option<#arg_types> = #datums.as_ref().map(|s| s.as_scalar_ref_impl().try_into().unwrap()); )* + #eval_row_variadic Ok(#row_output) } } Ok(Box::new(#struct_name { context, - #(#child,)* + children, prebuilt_arg, })) } diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 4d8c48ca9ccac..c6ebffdff660f 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -38,6 +38,7 @@ mod utils; /// - [Rust Function Signature](#rust-function-signature) /// - [Nullable Arguments](#nullable-arguments) /// - [Return Value](#return-value) +/// - [Variadic Function](#variadic-function) /// - [Optimization](#optimization) /// - [Functions Returning Strings](#functions-returning-strings) /// - [Preprocessing Constant Arguments](#preprocessing-constant-arguments) @@ -62,13 +63,15 @@ mod utils; /// invocation. The signature follows this pattern: /// /// ```text -/// name ( [arg_types],* ) [ -> [setof] return_type ] +/// name ( [arg_types],* [...] ) [ -> [setof] return_type ] /// ``` /// -/// Where `name` is the function name, which must match the function name defined in `prost`. +/// Where `name` is the function name in `snake_case`, which must match the function name defined +/// in `prost`. /// -/// The allowed data types are listed in the `name` column of the appendix's [type matrix]. -/// Wildcards or `auto` can also be used, as explained below. +/// `arg_types` is a comma-separated list of argument types. The allowed data types are listed in +/// in the `name` column of the appendix's [type matrix]. Wildcards or `auto` can also be used, as +/// explained below. If the function is variadic, the last argument can be denoted as `...`. /// /// When `setof` appears before the return type, this indicates that the function is a set-returning /// function (table function), meaning it can return multiple values instead of just one. For more @@ -203,6 +206,21 @@ mod utils; /// /// Therefore, try to avoid returning `Option` and `Result` whenever possible. /// +/// ## Variadic Function +/// +/// Variadic functions accept a `impl Row` input to represent tailing arguments. +/// For example: +/// +/// ```ignore +/// #[function("concat_ws(varchar, ...) -> varchar")] +/// fn concat_ws(sep: &str, vals: impl Row) -> Option> { +/// let mut string_iter = vals.iter().flatten(); +/// // ... +/// } +/// ``` +/// +/// See `risingwave_common::row::Row` for more details. +/// /// ## Functions Returning Strings /// /// For functions that return varchar types, you can also use the writer style function signature to @@ -569,6 +587,7 @@ impl FunctionAttr { fn ident_name(&self) -> String { format!("{}_{}_{}", self.name, self.args.join("_"), self.ret) .replace("[]", "list") + .replace("...", "variadic") .replace(['<', '>', ' ', ','], "_") .replace("__", "_") } diff --git a/src/expr/macro/src/types.rs b/src/expr/macro/src/types.rs index 53f224b79a773..f0868697757f7 100644 --- a/src/expr/macro/src/types.rs +++ b/src/expr/macro/src/types.rs @@ -40,9 +40,10 @@ const TYPE_MATRIX: &str = " /// Maps a data type to its corresponding data type name. pub fn data_type(ty: &str) -> &str { // XXX: - // For functions that contain `any` type, there are special handlings in the frontend, - // and the signature won't be accessed. So we simply return a placeholder here. - if ty == "any" { + // For functions that contain `any` type, or `...` variable arguments, + // there are special handlings in the frontend, and the signature won't be accessed. + // So we simply return a placeholder here. + if ty == "any" || ty == "..." { return "Int32"; } lookup_matrix(ty, 1) diff --git a/src/expr/src/error.rs b/src/expr/src/error.rs index d1ae1eb35ad0c..1128c05e76a77 100644 --- a/src/expr/src/error.rs +++ b/src/expr/src/error.rs @@ -77,6 +77,9 @@ pub enum ExprError { #[error("field name must not be null")] FieldNameNull, + #[error("too few arguments for format()")] + TooFewArguments, + #[error("invalid state: {0}")] InvalidState(String), } diff --git a/src/expr/src/expr/build.rs b/src/expr/src/expr/build.rs index 1f34adead8855..b2b6db6eafc3c 100644 --- a/src/expr/src/expr/build.rs +++ b/src/expr/src/expr/build.rs @@ -22,10 +22,8 @@ use risingwave_pb::expr::ExprNode; use super::expr_array_transform::ArrayTransformExpression; use super::expr_case::CaseExpression; use super::expr_coalesce::CoalesceExpression; -use super::expr_concat_ws::ConcatWsExpression; use super::expr_field::FieldExpression; use super::expr_in::InExpression; -use super::expr_nested_construct::NestedConstructExpression; use super::expr_some_all::SomeAllExpression; use super::expr_udf::UdfExpression; use super::expr_vnode::VnodeExpression; @@ -56,10 +54,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { E::In => InExpression::try_from_boxed(prost), E::Case => CaseExpression::try_from_boxed(prost), E::Coalesce => CoalesceExpression::try_from_boxed(prost), - E::ConcatWs => ConcatWsExpression::try_from_boxed(prost), E::Field => FieldExpression::try_from_boxed(prost), - E::Array => NestedConstructExpression::try_from_boxed(prost), - E::Row => NestedConstructExpression::try_from_boxed(prost), E::Vnode => VnodeExpression::try_from_boxed(prost), _ => { diff --git a/src/expr/src/expr/expr_concat_ws.rs b/src/expr/src/expr/expr_concat_ws.rs deleted file mode 100644 index 5bca7d0aea75c..0000000000000 --- a/src/expr/src/expr/expr_concat_ws.rs +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::TryFrom; -use std::fmt::Write; -use std::sync::Arc; - -use risingwave_common::array::{ - Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, Utf8ArrayBuilder, -}; -use risingwave_common::row::OwnedRow; -use risingwave_common::types::{DataType, Datum}; -use risingwave_pb::expr::expr_node::{RexNode, Type}; -use risingwave_pb::expr::ExprNode; - -use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Expression}; -use crate::{bail, ensure, ExprError, Result}; - -#[derive(Debug)] -pub struct ConcatWsExpression { - return_type: DataType, - sep_expr: BoxedExpression, - string_exprs: Vec, -} - -#[async_trait::async_trait] -impl Expression for ConcatWsExpression { - fn return_type(&self) -> DataType { - self.return_type.clone() - } - - async fn eval(&self, input: &DataChunk) -> Result { - let sep_column = self.sep_expr.eval_checked(input).await?; - let sep_column = sep_column.as_utf8(); - - let mut string_columns = Vec::with_capacity(self.string_exprs.len()); - for expr in &self.string_exprs { - string_columns.push(expr.eval_checked(input).await?); - } - let string_columns_ref = string_columns - .iter() - .map(|c| c.as_utf8()) - .collect::>(); - - let row_len = input.capacity(); - let vis = input.vis(); - let mut builder = Utf8ArrayBuilder::new(row_len); - - for row_idx in 0..row_len { - if !vis.is_set(row_idx) { - builder.append(None); - continue; - } - let sep = match sep_column.value_at(row_idx) { - Some(sep) => sep, - None => { - builder.append(None); - continue; - } - }; - - let mut writer = builder.writer().begin(); - - let mut string_columns = string_columns_ref.iter(); - for string_column in string_columns.by_ref() { - if let Some(string) = string_column.value_at(row_idx) { - writer.write_str(string).unwrap(); - break; - } - } - - for string_column in string_columns { - if let Some(string) = string_column.value_at(row_idx) { - writer.write_str(sep).unwrap(); - writer.write_str(string).unwrap(); - } - } - - writer.finish(); - } - Ok(Arc::new(ArrayImpl::from(builder.finish()))) - } - - async fn eval_row(&self, input: &OwnedRow) -> Result { - let sep = self.sep_expr.eval_row(input).await?; - let sep = match sep { - Some(sep) => sep, - None => return Ok(None), - }; - - let mut strings = Vec::with_capacity(self.string_exprs.len()); - for expr in &self.string_exprs { - strings.push(expr.eval_row(input).await?); - } - let mut final_string = String::new(); - - let mut strings_iter = strings.iter(); - if let Some(string) = strings_iter.by_ref().flatten().next() { - final_string.push_str(string.as_utf8()) - } - - for string in strings_iter.flatten() { - final_string.push_str(sep.as_utf8()); - final_string.push_str(string.as_utf8()); - } - - Ok(Some(final_string.into())) - } -} - -impl ConcatWsExpression { - pub fn new( - return_type: DataType, - sep_expr: BoxedExpression, - string_exprs: Vec, - ) -> Self { - ConcatWsExpression { - return_type, - sep_expr, - string_exprs, - } - } -} - -impl<'a> TryFrom<&'a ExprNode> for ConcatWsExpression { - type Error = ExprError; - - fn try_from(prost: &'a ExprNode) -> Result { - ensure!(prost.get_function_type().unwrap() == Type::ConcatWs); - - let ret_type = DataType::from(prost.get_return_type().unwrap()); - let RexNode::FuncCall(func_call_node) = prost.get_rex_node().unwrap() else { - bail!("Expected RexNode::FuncCall"); - }; - - let children = &func_call_node.children; - let sep_expr = expr_build_from_prost(&children[0])?; - - let string_exprs = children[1..] - .iter() - .map(expr_build_from_prost) - .collect::>>()?; - Ok(ConcatWsExpression::new(ret_type, sep_expr, string_exprs)) - } -} - -#[cfg(test)] -mod tests { - use itertools::Itertools; - use risingwave_common::array::{DataChunk, DataChunkTestExt}; - use risingwave_common::row::OwnedRow; - use risingwave_common::types::Datum; - use risingwave_pb::data::data_type::TypeName; - use risingwave_pb::data::PbDataType; - use risingwave_pb::expr::expr_node::RexNode; - use risingwave_pb::expr::expr_node::Type::ConcatWs; - use risingwave_pb::expr::{ExprNode, FunctionCall}; - - use crate::expr::expr_concat_ws::ConcatWsExpression; - use crate::expr::test_utils::make_input_ref; - use crate::expr::Expression; - - pub fn make_concat_ws_function(children: Vec, ret: TypeName) -> ExprNode { - ExprNode { - function_type: ConcatWs as i32, - return_type: Some(PbDataType { - type_name: ret as i32, - ..Default::default() - }), - rex_node: Some(RexNode::FuncCall(FunctionCall { children })), - } - } - - #[tokio::test] - async fn test_eval_concat_ws_expr() { - let input_node1 = make_input_ref(0, TypeName::Varchar); - let input_node2 = make_input_ref(1, TypeName::Varchar); - let input_node3 = make_input_ref(2, TypeName::Varchar); - let input_node4 = make_input_ref(3, TypeName::Varchar); - let concat_ws_expr = ConcatWsExpression::try_from(&make_concat_ws_function( - vec![input_node1, input_node2, input_node3, input_node4], - TypeName::Varchar, - )) - .unwrap(); - - let chunk = DataChunk::from_pretty( - " - T T T T - , a b c - . a b c - , . b c - , . . . - . . . .", - ); - - let actual = concat_ws_expr.eval(&chunk).await.unwrap(); - let actual = actual - .iter() - .map(|r| r.map(|s| s.into_utf8())) - .collect_vec(); - - let expected = vec![Some("a,b,c"), None, Some("b,c"), Some(""), None]; - - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn test_eval_row_concat_ws_expr() { - let input_node1 = make_input_ref(0, TypeName::Varchar); - let input_node2 = make_input_ref(1, TypeName::Varchar); - let input_node3 = make_input_ref(2, TypeName::Varchar); - let input_node4 = make_input_ref(3, TypeName::Varchar); - let concat_ws_expr = ConcatWsExpression::try_from(&make_concat_ws_function( - vec![input_node1, input_node2, input_node3, input_node4], - TypeName::Varchar, - )) - .unwrap(); - - let row_inputs = vec![ - vec![Some(","), Some("a"), Some("b"), Some("c")], - vec![None, Some("a"), Some("b"), Some("c")], - vec![Some(","), None, Some("b"), Some("c")], - vec![Some(","), None, None, None], - vec![None, None, None, None], - ]; - - let expected = [Some("a,b,c"), None, Some("b,c"), Some(""), None]; - - for (i, row_input) in row_inputs.iter().enumerate() { - let datum_vec: Vec = row_input.iter().map(|e| e.map(|s| s.into())).collect(); - let row = OwnedRow::new(datum_vec); - - let result = concat_ws_expr.eval_row(&row).await.unwrap(); - let expected = expected[i].map(|s| s.into()); - - assert_eq!(result, expected); - } - } -} diff --git a/src/expr/src/expr/expr_nested_construct.rs b/src/expr/src/expr/expr_nested_construct.rs deleted file mode 100644 index ece26ed138258..0000000000000 --- a/src/expr/src/expr/expr_nested_construct.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::TryFrom; -use std::sync::Arc; - -use risingwave_common::array::{ - ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, ListArrayBuilder, ListValue, StructArray, - StructValue, -}; -use risingwave_common::row::OwnedRow; -use risingwave_common::types::{DataType, Datum, Scalar}; -use risingwave_pb::expr::expr_node::{RexNode, Type}; -use risingwave_pb::expr::ExprNode; - -use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression, Expression}; -use crate::{bail, ensure, ExprError, Result}; - -#[derive(Debug)] -pub struct NestedConstructExpression { - data_type: DataType, - elements: Vec, -} - -#[async_trait::async_trait] -impl Expression for NestedConstructExpression { - fn return_type(&self) -> DataType { - self.data_type.clone() - } - - async fn eval(&self, input: &DataChunk) -> Result { - let mut columns = Vec::with_capacity(self.elements.len()); - for e in &self.elements { - columns.push(e.eval_checked(input).await?); - } - - if let DataType::Struct(ty) = &self.data_type { - let array = StructArray::new(ty.clone(), columns, input.vis().to_bitmap()); - Ok(Arc::new(ArrayImpl::Struct(array))) - } else if let DataType::List { .. } = &self.data_type { - let chunk = DataChunk::new(columns, input.vis().clone()); - let mut builder = ListArrayBuilder::with_type(input.capacity(), self.data_type.clone()); - for row in chunk.rows_with_holes() { - if let Some(row) = row { - builder.append_row_ref(row); - } else { - builder.append_null(); - } - } - Ok(Arc::new(ArrayImpl::List(builder.finish()))) - } else { - Err(ExprError::UnsupportedFunction( - "expects struct or list type".to_string(), - )) - } - } - - async fn eval_row(&self, input: &OwnedRow) -> Result { - let mut datums = Vec::with_capacity(self.elements.len()); - for e in &self.elements { - datums.push(e.eval_row(input).await?); - } - if let DataType::Struct { .. } = &self.data_type { - Ok(Some(StructValue::new(datums).to_scalar_value())) - } else if let DataType::List(_) = &self.data_type { - Ok(Some(ListValue::new(datums).to_scalar_value())) - } else { - Err(ExprError::UnsupportedFunction( - "expects struct or list type".to_string(), - )) - } - } -} - -impl NestedConstructExpression { - pub fn new(data_type: DataType, elements: Vec) -> Self { - NestedConstructExpression { - data_type, - elements, - } - } -} - -impl<'a> TryFrom<&'a ExprNode> for NestedConstructExpression { - type Error = ExprError; - - fn try_from(prost: &'a ExprNode) -> Result { - ensure!([Type::Array, Type::Row].contains(&prost.get_function_type().unwrap())); - - let ret_type = DataType::from(prost.get_return_type().unwrap()); - let RexNode::FuncCall(func_call_node) = prost.get_rex_node().unwrap() else { - bail!("Expected RexNode::FuncCall"); - }; - let elements = func_call_node - .children - .iter() - .map(expr_build_from_prost) - .collect::>>()?; - Ok(NestedConstructExpression::new(ret_type, elements)) - } -} - -#[cfg(test)] -mod tests { - use risingwave_common::array::{DataChunk, ListValue}; - use risingwave_common::row::OwnedRow; - use risingwave_common::types::{DataType, Scalar, ScalarImpl}; - - use super::NestedConstructExpression; - use crate::expr::{BoxedExpression, Expression, LiteralExpression}; - - #[tokio::test] - async fn test_eval_array_expr() { - let expr = NestedConstructExpression { - data_type: DataType::List(DataType::Int32.into()), - elements: vec![i32_expr(1.into()), i32_expr(2.into())], - }; - - let arr = expr.eval(&DataChunk::new_dummy(2)).await.unwrap(); - assert_eq!(arr.len(), 2); - } - - #[tokio::test] - async fn test_eval_row_array_expr() { - let expr = NestedConstructExpression { - data_type: DataType::List(DataType::Int32.into()), - elements: vec![i32_expr(1.into()), i32_expr(2.into())], - }; - - let scalar_impl = expr - .eval_row(&OwnedRow::new(vec![])) - .await - .unwrap() - .unwrap(); - let expected = ListValue::new(vec![Some(1.into()), Some(2.into())]).to_scalar_value(); - assert_eq!(expected, scalar_impl); - } - - fn i32_expr(v: ScalarImpl) -> BoxedExpression { - Box::new(LiteralExpression::new(DataType::Int32, Some(v))) - } -} diff --git a/src/expr/src/expr/mod.rs b/src/expr/src/expr/mod.rs index 8a424f4908448..33509506753fa 100644 --- a/src/expr/src/expr/mod.rs +++ b/src/expr/src/expr/mod.rs @@ -37,12 +37,10 @@ mod expr_binary_nonnull; mod expr_binary_nullable; mod expr_case; mod expr_coalesce; -mod expr_concat_ws; mod expr_field; mod expr_in; mod expr_input_ref; mod expr_literal; -mod expr_nested_construct; mod expr_some_all; pub(crate) mod expr_udf; mod expr_unary; diff --git a/src/expr/src/sig/func.rs b/src/expr/src/sig/func.rs index 6e665ead3fc40..e8e0d19ec11d7 100644 --- a/src/expr/src/sig/func.rs +++ b/src/expr/src/sig/func.rs @@ -40,30 +40,30 @@ pub fn func_sigs() -> impl Iterator { } #[derive(Default, Clone, Debug)] -pub struct FuncSigMap(HashMap<(PbType, usize), Vec>); +pub struct FuncSigMap(HashMap>); impl FuncSigMap { /// Inserts a function signature. pub fn insert(&mut self, desc: FuncSign) { - self.0 - .entry((desc.func, desc.inputs_type.len())) - .or_default() - .push(desc) + self.0.entry(desc.func).or_default().push(desc) } /// Returns a function signature with the same type, argument types and return type. /// Deprecated functions are included. pub fn get(&self, ty: PbType, args: &[DataTypeName], ret: DataTypeName) -> Option<&FuncSign> { - let v = self.0.get(&(ty, args.len()))?; + let v = self.0.get(&ty)?; v.iter() - .find(|d| d.inputs_type == args && d.ret_type == ret) + .find(|d| (d.variadic || d.inputs_type == args) && d.ret_type == ret) } /// Returns all function signatures with the same type and number of arguments. /// Deprecated functions are excluded. pub fn get_with_arg_nums(&self, ty: PbType, nargs: usize) -> Vec<&FuncSign> { - match self.0.get(&(ty, nargs)) { - Some(v) => v.iter().filter(|d| !d.deprecated).collect(), + match self.0.get(&ty) { + Some(v) => v + .iter() + .filter(|d| (d.variadic || d.inputs_type.len() == nargs) && !d.deprecated) + .collect(), None => vec![], } } @@ -74,6 +74,7 @@ impl FuncSigMap { pub struct FuncSign { pub func: PbType, pub inputs_type: &'static [DataTypeName], + pub variadic: bool, pub ret_type: DataTypeName, pub build: fn(return_type: DataType, children: Vec) -> Result, /// Whether the function is deprecated and should not be used in the frontend. @@ -128,11 +129,10 @@ mod tests { // convert FUNC_SIG_MAP to a more convenient map for testing let mut new_map: BTreeMap, Vec>> = BTreeMap::new(); - for ((func, num_args), sigs) in &FUNC_SIG_MAP.0 { + for (func, sigs) in &FUNC_SIG_MAP.0 { for sig in sigs { // validate the FUNC_SIG_MAP is consistent assert_eq!(func, &sig.func); - assert_eq!(num_args, &sig.inputs_type.len()); // exclude deprecated functions if sig.deprecated { continue; diff --git a/src/expr/src/vector_op/array.rs b/src/expr/src/vector_op/array.rs new file mode 100644 index 0000000000000..26f1fa2492064 --- /dev/null +++ b/src/expr/src/vector_op/array.rs @@ -0,0 +1,28 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::{ListValue, StructValue}; +use risingwave_common::row::Row; +use risingwave_common::types::ToOwnedDatum; +use risingwave_expr_macro::function; + +#[function("array(...) -> list")] +fn array(row: impl Row) -> ListValue { + ListValue::new(row.iter().map(|d| d.to_owned_datum()).collect()) +} + +#[function("row(...) -> struct")] +fn row_(row: impl Row) -> StructValue { + StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect()) +} diff --git a/src/expr/src/vector_op/concat_ws.rs b/src/expr/src/vector_op/concat_ws.rs new file mode 100644 index 0000000000000..293adb3079c0d --- /dev/null +++ b/src/expr/src/vector_op/concat_ws.rs @@ -0,0 +1,70 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Write; + +use risingwave_common::row::Row; +use risingwave_common::types::ToText; +use risingwave_expr_macro::function; + +/// Concatenates all but the first argument, with separators. The first argument is used as the +/// separator string, and should not be NULL. Other NULL arguments are ignored. +#[function("concat_ws(varchar, ...) -> varchar")] +fn concat_ws(sep: &str, vals: impl Row, writer: &mut impl Write) -> Option<()> { + let mut string_iter = vals.iter().flatten(); + if let Some(string) = string_iter.next() { + string.write(writer).unwrap(); + } + for string in string_iter { + write!(writer, "{}", sep).unwrap(); + string.write(writer).unwrap(); + } + Some(()) +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::DataChunk; + use risingwave_common::row::Row; + use risingwave_common::test_prelude::DataChunkTestExt; + use risingwave_common::types::ToOwnedDatum; + use risingwave_common::util::iter_util::ZipEqDebug; + + use crate::expr::build_from_pretty; + + #[tokio::test] + async fn test_concat_ws() { + let concat_ws = + build_from_pretty("(concat_ws:varchar $0:varchar $1:varchar $2:varchar $3:varchar)"); + let (input, expected) = DataChunk::from_pretty( + "T T T T T + , a b c a,b,c + , . b c b,c + . a b c . + , . . . (empty) + . . . . .", + ) + .split_column_at(4); + + // test eval + let output = concat_ws.eval(&input).await.unwrap(); + assert_eq!(&output, expected.column_at(0)); + + // test eval_row + for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { + let result = concat_ws.eval_row(&row.to_owned_row()).await.unwrap(); + assert_eq!(result, expected.datum_at(0).to_owned_datum()); + } + } +} diff --git a/src/common/src/format.rs b/src/expr/src/vector_op/format.rs similarity index 50% rename from src/common/src/format.rs rename to src/expr/src/vector_op/format.rs index 4bd5e8c905a4d..081a1e2ef1fb6 100644 --- a/src/common/src/format.rs +++ b/src/expr/src/vector_op/format.rs @@ -12,7 +12,53 @@ // See the License for the specific language governing permissions and // limitations under the License. -use thiserror::Error; +use std::fmt::Write; +use std::str::FromStr; + +use risingwave_common::row::Row; +use risingwave_common::types::{ScalarRefImpl, ToText}; +use risingwave_expr_macro::function; + +use super::string::quote_ident; +use crate::{ExprError, Result}; + +/// Formats arguments according to a format string. +#[function( + "format(varchar, ...) -> varchar", + prebuild = "Formatter::from_str($0).map_err(|e| ExprError::Parse(e.to_string().into()))?" +)] +fn format(formatter: &Formatter, row: impl Row, writer: &mut impl Write) -> Result<()> { + let mut args = row.iter(); + for node in &formatter.nodes { + match node { + FormatterNode::Literal(literal) => writer.write_str(literal).unwrap(), + FormatterNode::Specifier(sp) => { + let arg = args.next().ok_or(ExprError::TooFewArguments)?; + match sp.ty { + SpecifierType::SimpleString => { + if let Some(scalar) = arg { + scalar.write(writer).unwrap(); + } + } + SpecifierType::SqlIdentifier => match arg { + Some(ScalarRefImpl::Utf8(arg)) => quote_ident(arg, writer), + _ => { + return Err(ExprError::UnsupportedFunction( + "unsupported data for specifier type 'I'".to_string(), + )) + } + }, + SpecifierType::SqlLiteral => { + return Err(ExprError::UnsupportedFunction( + "unsupported specifier type 'L'".to_string(), + )) + } + } + } + } + } + Ok(()) +} /// The type of format conversion to use to produce the format specifier's output. #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -31,7 +77,7 @@ pub enum SpecifierType { impl TryFrom for SpecifierType { type Error = (); - fn try_from(c: char) -> Result { + fn try_from(c: char) -> std::result::Result { match c { 's' => Ok(SpecifierType::SimpleString), 'I' => Ok(SpecifierType::SqlIdentifier), @@ -42,34 +88,36 @@ impl TryFrom for SpecifierType { } #[derive(Debug)] -pub struct Specifier { +struct Specifier { // TODO: support position, flags and width. - pub ty: SpecifierType, + ty: SpecifierType, } #[derive(Debug)] -pub enum FormatterNode { +enum FormatterNode { Specifier(Specifier), Literal(String), } #[derive(Debug)] -pub struct Formatter { +struct Formatter { nodes: Vec, } -#[derive(Debug, Error)] -pub enum ParseFormatError { +#[derive(Debug, thiserror::Error)] +enum ParseFormatError { #[error("unrecognized format() type specifier \"{0}\"")] UnrecognizedSpecifierType(char), #[error("unterminated format() type specifier")] UnterminatedSpecifier, } -impl Formatter { +impl FromStr for Formatter { + type Err = ParseFormatError; + /// Parse the format string into a high-efficient representation. /// - pub fn parse(format: &str) -> Result { + fn from_str(format: &str) -> std::result::Result { // 8 is a good magic number here, it can cover an input like 'Testing %s, %s, %s, %%'. let mut nodes = Vec::with_capacity(8); let mut after_percent = false; @@ -106,8 +154,38 @@ impl Formatter { Ok(Formatter { nodes }) } +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::DataChunk; + use risingwave_common::row::Row; + use risingwave_common::test_prelude::DataChunkTestExt; + use risingwave_common::types::ToOwnedDatum; + use risingwave_common::util::iter_util::ZipEqDebug; - pub fn nodes(&self) -> &[FormatterNode] { - &self.nodes + use crate::expr::build_from_pretty; + + #[tokio::test] + async fn test_format() { + let format = build_from_pretty("(format:varchar $0:varchar $1:varchar $2:varchar)"); + let (input, expected) = DataChunk::from_pretty( + "T T T T + Hello%s World . HelloWorld + %s%s Hello World HelloWorld + %I && . \"&&\" + . a b .", + ) + .split_column_at(3); + + // test eval + let output = format.eval(&input).await.unwrap(); + assert_eq!(&output, expected.column_at(0)); + + // test eval_row + for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { + let result = format.eval_row(&row.to_owned_row()).await.unwrap(); + assert_eq!(result, expected.datum_at(0).to_owned_datum()); + } } } diff --git a/src/expr/src/vector_op/mod.rs b/src/expr/src/vector_op/mod.rs index 7125ca33ec3de..0027abfa65542 100644 --- a/src/expr/src/vector_op/mod.rs +++ b/src/expr/src/vector_op/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod arithmetic_op; +pub mod array; pub mod array_access; pub mod array_concat; pub mod array_distinct; @@ -31,12 +32,14 @@ pub mod cardinality; pub mod cast; pub mod cmp; pub mod concat_op; +pub mod concat_ws; pub mod conjunction; pub mod date_trunc; pub mod delay; pub mod encdec; pub mod exp; pub mod extract; +pub mod format; pub mod format_type; pub mod int256; pub mod jsonb_access; diff --git a/src/frontend/planner_test/tests/testdata/input/format.yaml b/src/frontend/planner_test/tests/testdata/input/format.yaml index 010d188766555..dd0df4dcd02df 100644 --- a/src/frontend/planner_test/tests/testdata/input/format.yaml +++ b/src/frontend/planner_test/tests/testdata/input/format.yaml @@ -11,19 +11,19 @@ CREATE TABLE t1(v1 varchar, v2 int, v3 int); SELECT format('Testing %s, %I, %s, %%', v1, v2, v3) FROM t1; expected_outputs: - - binder_error + - batch_plan - sql: | SELECT format('Testing %s, %s, %s, %%', 'one', 'two'); expected_outputs: - - binder_error + - batch_error - sql: | SELECT format('Testing %s, %s, %s, %', 'one', 'two', 'three'); expected_outputs: - - binder_error + - batch_error - sql: | SELECT format('Testing %s, %f, %d, %', 'one', 'two', 'three'); expected_outputs: - - binder_error + - batch_error - sql: | SELECT format(); expected_outputs: diff --git a/src/frontend/planner_test/tests/testdata/output/format.yaml b/src/frontend/planner_test/tests/testdata/output/format.yaml index 06ace2257308c..1c42a0f9252ac 100644 --- a/src/frontend/planner_test/tests/testdata/output/format.yaml +++ b/src/frontend/planner_test/tests/testdata/output/format.yaml @@ -7,38 +7,24 @@ SELECT format('Testing %s, %I, %s, %%', v1, v2, v3) FROM t1; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [ConcatWs('':Varchar, 'Testing ':Varchar, t1.v1, ', ':Varchar, QuoteIdent(t1.v2), ', ':Varchar, t1.v3::Varchar, ', %':Varchar) as $expr1] } + └─BatchProject { exprs: [Format('Testing %s, %I, %s, %%':Varchar, t1.v1, t1.v2, t1.v3::Varchar) as $expr1] } └─BatchScan { table: t1, columns: [t1.v1, t1.v2, t1.v3], distribution: SomeShard } - sql: | CREATE TABLE t1(v1 varchar, v2 int, v3 int); SELECT format('Testing %s, %I, %s, %%', v1, v2, v3) FROM t1; - binder_error: |- - Bind error: failed to bind expression: format('Testing %s, %I, %s, %%', v1, v2, v3) - - Caused by: - Feature is not yet implemented: QuoteIdent[Int32] - Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112 + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [Format('Testing %s, %I, %s, %%':Varchar, t1.v1, t1.v2::Varchar, t1.v3::Varchar) as $expr1] } + └─BatchScan { table: t1, columns: [t1.v1, t1.v2, t1.v3], distribution: SomeShard } - sql: | SELECT format('Testing %s, %s, %s, %%', 'one', 'two'); - binder_error: |- - Bind error: failed to bind expression: format('Testing %s, %s, %s, %%', 'one', 'two') - - Caused by: - Bind error: Function `format` required 3 arguments based on the `formatstr`, but 2 found. + batch_error: 'Expr error: too few arguments for format()' - sql: | SELECT format('Testing %s, %s, %s, %', 'one', 'two', 'three'); - binder_error: |- - Bind error: failed to bind expression: format('Testing %s, %s, %s, %', 'one', 'two', 'three') - - Caused by: - Bind error: unterminated format() type specifier + batch_error: 'Expr error: Parse error: unterminated format() type specifier' - sql: | SELECT format('Testing %s, %f, %d, %', 'one', 'two', 'three'); - binder_error: |- - Bind error: failed to bind expression: format('Testing %s, %f, %d, %', 'one', 'two', 'three') - - Caused by: - Bind error: unrecognized format() type specifier "f" + batch_error: 'Expr error: Parse error: unrecognized format() type specifier "f"' - sql: | SELECT format(); binder_error: |- diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index c3bdf1febcccb..44c79a1620c79 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -22,7 +22,6 @@ use itertools::Itertools; use risingwave_common::array::ListValue; use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME; use risingwave_common::error::{ErrorCode, Result, RwError}; -use risingwave_common::format::{Formatter, FormatterNode, SpecifierType}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; use risingwave_common::{GIT_SHA, RW_VERSION}; @@ -755,7 +754,7 @@ impl Binder { rewrite(ExprType::ConcatWs, Binder::rewrite_concat_to_concat_ws), ), ("concat_ws", raw_call(ExprType::ConcatWs)), - ("format", rewrite(ExprType::ConcatWs, Binder::rewrite_format_to_concat_ws)), + ("format", raw_call(ExprType::Format)), ("translate", raw_call(ExprType::Translate)), ("split_part", raw_call(ExprType::SplitPart)), ("char_length", raw_call(ExprType::CharLength)), @@ -1204,75 +1203,6 @@ impl Binder { } } - fn rewrite_format_to_concat_ws(inputs: Vec) -> Result> { - let Some((format_expr, args)) = inputs.split_first() else { - return Err(ErrorCode::BindError( - "Function `format` takes at least 1 arguments (0 given)".to_string(), - ) - .into()); - }; - let ExprImpl::Literal(expr_literal) = format_expr else { - return Err(ErrorCode::BindError( - "Function `format` takes a literal string as the first argument".to_string(), - ) - .into()); - }; - let Some(ScalarImpl::Utf8(format_str)) = expr_literal.get_data() else { - return Err(ErrorCode::BindError( - "Function `format` takes a literal string as the first argument".to_string(), - ) - .into()); - }; - let formatter = Formatter::parse(format_str) - .map_err(|err| -> RwError { ErrorCode::BindError(err.to_string()).into() })?; - - let specifier_count = formatter - .nodes() - .iter() - .filter(|f_node| matches!(f_node, FormatterNode::Specifier(_))) - .count(); - if specifier_count != args.len() { - return Err(ErrorCode::BindError(format!( - "Function `format` required {} arguments based on the `formatstr`, but {} found.", - specifier_count, - args.len() - )) - .into()); - } - - // iter the args. - let mut j = 0; - let new_args = [Ok(ExprImpl::literal_varchar("".to_string()))] - .into_iter() - .chain(formatter.nodes().iter().map(move |f_node| -> Result<_> { - let new_arg = match f_node { - FormatterNode::Specifier(sp) => { - // We've checked the count. - let arg = &args[j]; - j += 1; - match sp.ty { - SpecifierType::SimpleString => arg.clone(), - SpecifierType::SqlIdentifier => { - FunctionCall::new(ExprType::QuoteIdent, vec![arg.clone()])?.into() - } - SpecifierType::SqlLiteral => { - return Err::<_, RwError>( - ErrorCode::BindError( - "unsupported specifier type 'L'".to_string(), - ) - .into(), - ) - } - } - } - FormatterNode::Literal(literal) => ExprImpl::literal_varchar(literal.clone()), - }; - Ok(new_arg) - })) - .try_collect()?; - Ok(new_args) - } - /// Make sure inputs only have 2 value and rewrite the arguments. /// Nullif(expr1,expr2) -> Case(Equal(expr1 = expr2),null,expr1). fn rewrite_nullif_to_case_when(inputs: Vec) -> Result> { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index f359b3bcea642..69be8376b46ca 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -342,6 +342,21 @@ fn infer_type_for_special( .try_collect()?; Ok(Some(DataType::Varchar)) } + ExprType::Format => { + ensure_arity!("format", 1 <= | inputs |); + let inputs_owned = std::mem::take(inputs); + *inputs = inputs_owned + .into_iter() + .enumerate() + .map(|(i, input)| match i { + // 0-th arg must be string + 0 => input.cast_implicit(DataType::Varchar).map_err(Into::into), + // subsequent can be any type, using the output format + _ => input.cast_output(), + }) + .try_collect()?; + Ok(Some(DataType::Varchar)) + } ExprType::IsNotNull => { ensure_arity!("is_not_null", | inputs | == 1); match inputs[0].return_type() { @@ -1203,6 +1218,7 @@ mod tests { sig_map.insert(FuncSign { func: DUMMY_FUNC, inputs_type: formals, + variadic: false, ret_type: DUMMY_RET, build: |_, _| unreachable!(), deprecated: false,