diff --git a/Cargo.lock b/Cargo.lock index 832176f7b13ef..8a934a045b1c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,6 +1303,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-dynamodb" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23c4ed3708df2778c0c49b16e8235e52eb8f2133ae6752c40eea1376e2563fec" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "http 0.2.9", + "regex", + "tracing", +] + [[package]] name = "aws-sdk-kinesis" version = "1.3.0" @@ -2021,22 +2044,22 @@ dependencies = [ [[package]] name = "borsh" -version = "1.2.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf617fabf5cdbdc92f774bfe5062d870f228b80056d41180797abf48bed4056e" +checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" dependencies = [ "borsh-derive", - "cfg_aliases 0.1.1", + "cfg_aliases", ] [[package]] name = "borsh-derive" -version = "1.2.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f404657a7ea7b5249e36808dff544bc88a28f26e0ac40009f674b7a009d14be3" +checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.57", @@ -2342,12 +2365,6 @@ dependencies = [ "syn 2.0.57", ] -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "cfg_aliases" version = "0.2.1" @@ -3657,7 +3674,7 @@ dependencies = [ name = "delta_btree_map" version = "1.9.0-alpha" dependencies = [ - "educe 0.5.7", + "educe 0.6.0", "enum-as-inner 0.6.0", ] @@ -4389,7 +4406,7 @@ dependencies = [ "serde", "thiserror", "time", - "winnow 0.6.9", + "winnow 0.6.11", ] [[package]] @@ -4469,9 +4486,9 @@ dependencies = [ [[package]] name = "educe" -version = "0.5.7" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597b6c9e278fefec3b25dfbb86195aeafe270dc6f13e27a3a20a95ba449f1bfb" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" dependencies = [ "enum-ordinalize 4.3.0", "proc-macro2", @@ -7637,7 +7654,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.5.0", "cfg-if", - "cfg_aliases 0.2.1", + "cfg_aliases", "libc", ] @@ -9028,15 +9045,6 @@ dependencies = [ "toml_edit 0.19.15", ] -[[package]] -name = "proc-macro-crate" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8366a6159044a37876a2b9817124296703c586a5c92e2c53751fa06d8d43e8" -dependencies = [ - "toml_edit 0.20.2", -] - [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -9446,7 +9454,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -10293,7 +10301,7 @@ dependencies = [ "criterion", "darwin-libproc", "easy-ext", - "educe 0.5.7", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "enumflags2", @@ -10381,7 +10389,7 @@ name = "risingwave_common_estimate_size" version = "1.9.0-alpha" dependencies = [ "bytes", - "educe 0.5.7", + "educe 0.6.0", "ethnum", "fixedbitset 0.5.0", "jsonbb", @@ -10587,6 +10595,7 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-msk-iam-sasl-signer", + "aws-sdk-dynamodb", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", @@ -10805,7 +10814,7 @@ dependencies = [ "const-currying", "downcast-rs", "easy-ext", - "educe 0.5.7", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -10850,7 +10859,7 @@ dependencies = [ "chrono", "chrono-tz 0.9.0", "criterion", - "educe 0.5.7", + "educe 0.6.0", "expect-test", "fancy-regex", "futures-async-stream", @@ -10917,7 +10926,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "easy-ext", - "educe 0.5.7", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11527,7 +11536,7 @@ dependencies = [ "tracing", "tracing-subscriber", "walkdir", - "winnow 0.6.9", + "winnow 0.6.11", "workspace-hack", ] @@ -11666,7 +11675,7 @@ dependencies = [ "cfg-if", "criterion", "delta_btree_map", - "educe 0.5.7", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -14611,17 +14620,6 @@ dependencies = [ "winnow 0.5.15", ] -[[package]] -name = "toml_edit" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" -dependencies = [ - "indexmap 2.0.0", - "toml_datetime", - "winnow 0.5.15", -] - [[package]] name = "toml_edit" version = "0.21.1" @@ -14643,7 +14641,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.9", + "winnow 0.6.11", ] [[package]] @@ -16323,9 +16321,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.9" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c949fede1d13936a99f14fafd3e76fd642b556dd2ce96287fbe2e0151bfac6" +checksum = "56c52728401e1dc672a56e81e593e912aa54c78f40246869f78359a2bf24d29d" dependencies = [ "memchr", ] diff --git a/Makefile.toml b/Makefile.toml index 8f3c3e48998ad..a154f0ba60dba 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1,120 +1,121 @@ extend = [ - { path = "src/risedevtool/grafana.toml" }, - { path = "src/risedevtool/prometheus.toml" }, - { path = "src/risedevtool/minio.toml" }, - { path = "src/risedevtool/etcd.toml" }, - { path = "src/risedevtool/tempo.toml" }, - { path = "src/risedevtool/gcloud-pubsub.toml" }, - { path = "src/risedevtool/redis.toml" }, - { path = "src/risedevtool/connector.toml" }, - { path = "src/risedevtool/risedev-components.toml" }, - { path = "src/sqlparser/sqlparser_test.toml" }, - { path = "src/frontend/planner_test/planner_test.toml" }, - { path = "src/tests/compaction_test/Makefile.toml" }, - { path = "src/storage/backup/integration_tests/Makefile.toml" }, - { path = "src/java_binding/make-java-binding.toml" }, - { path = "src/stream/tests/integration_tests/integration_test.toml" }, - { path = "e2e_test/source_inline/commands.toml" }, + { path = "src/risedevtool/grafana.toml" }, + { path = "src/risedevtool/prometheus.toml" }, + { path = "src/risedevtool/minio.toml" }, + { path = "src/risedevtool/etcd.toml" }, + { path = "src/risedevtool/tempo.toml" }, + { path = "src/risedevtool/gcloud-pubsub.toml" }, + { path = "src/risedevtool/redis.toml" }, + { path = "src/risedevtool/connector.toml" }, + { path = "src/risedevtool/risedev-components.toml" }, + { path = "src/sqlparser/sqlparser_test.toml" }, + { path = "src/frontend/planner_test/planner_test.toml" }, + { path = "src/tests/compaction_test/Makefile.toml" }, + { path = "src/storage/backup/integration_tests/Makefile.toml" }, + { path = "src/java_binding/make-java-binding.toml" }, + { path = "src/stream/tests/integration_tests/integration_test.toml" }, + { path = "e2e_test/source_inline/commands.toml" }, ] env_files = ["./risedev-components.user.env"] env_scripts = [ - ''' -#!@duckscript + ''' + #!@duckscript + + # only duckscript can modify env variables in cargo-make + # duckscript doc: https://github.com/sagiegurari/duckscript/blob/master/docs/sdk.md + + set_env ENABLE_TELEMETRY "false" + set_env RW_TELEMETRY_TYPE "test" + + is_sanitizer_enabled = get_env ENABLE_SANITIZER + is_hdfs_backend = get_env ENABLE_HDFS + is_release = get_env ENABLE_RELEASE_PROFILE + is_not_release = not ${is_release} + is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING + is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE + is_external_udf_enabled = get_env ENABLE_EXTERNAL_UDF + is_wasm_udf_enabled = get_env ENABLE_WASM_UDF + is_js_udf_enabled = get_env ENABLE_JS_UDF + is_deno_udf_enabled = get_env ENABLE_DENO_UDF + is_python_udf_enabled = get_env ENABLE_PYTHON_UDF + + if ${is_sanitizer_enabled} + set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings -Zbuild-std --target ${CARGO_MAKE_RUST_TARGET_TRIPLE}" + set_env RISEDEV_BUILD_TARGET_DIR "${CARGO_MAKE_RUST_TARGET_TRIPLE}/" + set_env RISEDEV_RUSTFLAGS "-Ctarget-cpu=native --cfg tokio_unstable -Zsanitizer=thread" + else + set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings" + set_env RISEDEV_BUILD_TARGET_DIR "" + end -# only duckscript can modify env variables in cargo-make -# duckscript doc: https://github.com/sagiegurari/duckscript/blob/master/docs/sdk.md - -set_env ENABLE_TELEMETRY "false" - -is_sanitizer_enabled = get_env ENABLE_SANITIZER -is_hdfs_backend = get_env ENABLE_HDFS -is_release = get_env ENABLE_RELEASE_PROFILE -is_not_release = not ${is_release} -is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING -is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE -is_external_udf_enabled = get_env ENABLE_EXTERNAL_UDF -is_wasm_udf_enabled = get_env ENABLE_WASM_UDF -is_js_udf_enabled = get_env ENABLE_JS_UDF -is_deno_udf_enabled = get_env ENABLE_DENO_UDF -is_python_udf_enabled = get_env ENABLE_PYTHON_UDF - -if ${is_sanitizer_enabled} - set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings -Zbuild-std --target ${CARGO_MAKE_RUST_TARGET_TRIPLE}" - set_env RISEDEV_BUILD_TARGET_DIR "${CARGO_MAKE_RUST_TARGET_TRIPLE}/" - set_env RISEDEV_RUSTFLAGS "-Ctarget-cpu=native --cfg tokio_unstable -Zsanitizer=thread" -else - set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings" - set_env RISEDEV_BUILD_TARGET_DIR "" -end + if ${is_hdfs_backend} + set_env BUILD_HDFS_BACKEND_CMD "-p risingwave_object_store --features hdfs-backend" + else + set_env BUILD_HDFS_BACKEND_CMD "" + end -if ${is_hdfs_backend} - set_env BUILD_HDFS_BACKEND_CMD "-p risingwave_object_store --features hdfs-backend" -else - set_env BUILD_HDFS_BACKEND_CMD "" -end + if ${is_not_release} and ${is_dynamic_linking} + set_env RISINGWAVE_FEATURE_FLAGS "--features rw-dynamic-link --no-default-features" + else + set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link" + end -if ${is_not_release} and ${is_dynamic_linking} - set_env RISINGWAVE_FEATURE_FLAGS "--features rw-dynamic-link --no-default-features" -else - set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link" -end - -if ${is_external_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features external-udf" -end - -if ${is_wasm_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features wasm-udf" -end - -if ${is_js_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features js-udf" -end - -if ${is_deno_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features deno-udf" -end - -if ${is_python_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features python-udf" -end - -if ${is_hummock_trace} - set_env BUILD_HUMMOCK_TRACE_CMD "-p risingwave_storage --features hm-trace" -else - set_env BUILD_HUMMOCK_TRACE_CMD "" -end + if ${is_external_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features external-udf" + end -is_ci = get_env RISINGWAVE_CI -is_not_ci = not ${is_ci} + if ${is_wasm_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features wasm-udf" + end -if ${is_not_ci} - query_log_path = get_env RW_QUERY_LOG_PATH - no_query_log_path = not ${query_log_path} + if ${is_js_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features js-udf" + end - if ${no_query_log_path} - set_env RW_QUERY_LOG_PATH "${PREFIX_LOG}" - fi + if ${is_deno_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features deno-udf" + end - rust_log = get_env RUST_LOG - no_rust_log = not ${rust_log} + if ${is_python_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features python-udf" + end - if ${no_rust_log} - set_env RUST_LOG "pgwire_query_log=info" + if ${is_hummock_trace} + set_env BUILD_HUMMOCK_TRACE_CMD "-p risingwave_storage --features hm-trace" else - set_env RUST_LOG "pgwire_query_log=info,${rust_log}" + set_env BUILD_HUMMOCK_TRACE_CMD "" + end + + is_ci = get_env RISINGWAVE_CI + is_not_ci = not ${is_ci} + + if ${is_not_ci} + query_log_path = get_env RW_QUERY_LOG_PATH + no_query_log_path = not ${query_log_path} + + if ${no_query_log_path} + set_env RW_QUERY_LOG_PATH "${PREFIX_LOG}" + fi + + rust_log = get_env RUST_LOG + no_rust_log = not ${rust_log} + + if ${no_rust_log} + set_env RUST_LOG "pgwire_query_log=info" + else + set_env RUST_LOG "pgwire_query_log=info,${rust_log}" + end end -end -set_env TMUX "tmux -L risedev" -''', + set_env TMUX "tmux -L risedev" + ''', ] @@ -322,7 +323,7 @@ description = "Codesign playground binary to support coredump" # codesign the binary before running. # https://developer.apple.com/forums/thread/694233?answerId=695943022#695943022 condition = { env_set = [ - "ENABLE_COREDUMP", + "ENABLE_COREDUMP", ], env = { "SYSTEM" = "darwin-arm64" } } script = ''' #!/usr/bin/env bash @@ -339,7 +340,7 @@ description = "Codesign all binaries to support coredump" # codesign the binary before running. # https://developer.apple.com/forums/thread/694233?answerId=695943022#695943022 condition = { env_set = [ - "ENABLE_COREDUMP", + "ENABLE_COREDUMP", ], env = { "SYSTEM" = "darwin-arm64" } } script = ''' #!/usr/bin/env bash @@ -374,9 +375,9 @@ category = "RiseDev - Build" description = "Copy RisngWave binaries to bin" condition = { env_set = ["ENABLE_BUILD_RUST"] } dependencies = [ - "link-all-in-one-binaries", - "link-user-bin", - "codesign-binaries", + "link-all-in-one-binaries", + "link-user-bin", + "codesign-binaries", ] [tasks.b] @@ -493,15 +494,15 @@ private = true category = "Misc" description = "Download all available components at once" dependencies = [ - "download-maven", - "download-etcd", - "download-grafana", - "download-tempo", - "download-mcli", - "download-minio", - "download-prometheus", - "download-pubsub", - "download-redis", + "download-maven", + "download-etcd", + "download-grafana", + "download-tempo", + "download-mcli", + "download-minio", + "download-prometheus", + "download-pubsub", + "download-redis", ] [tasks.create-user-profiles-file] @@ -509,7 +510,7 @@ private = true category = "RiseDev - Prepare" description = "Create user profiles file if not exists" condition = { files_not_exist = [ - "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/risedev-profiles.user.yml", + "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/risedev-profiles.user.yml", ] } script = ''' #!/usr/bin/env bash @@ -536,34 +537,34 @@ EOF category = "RiseDev - Prepare" description = "Prepare dev cluster by downloading necessary tools and build required components" dependencies = [ - "create-user-profiles-file", - "download-all", - "build-risingwave", - "build-connector-node", - "post-build-risingwave", - "prepare-config", + "create-user-profiles-file", + "download-all", + "build-risingwave", + "build-connector-node", + "post-build-risingwave", + "prepare-config", ] [tasks.pre-start-benchmark] category = "RiseDev - Prepare" description = "Download necessary tools to deploy a benchmark env" dependencies = [ - "download-minio", - "download-mcli", - "download-etcd", - "download-grafana", - "download-prometheus", - "download-tempo", - "download-redis", + "download-minio", + "download-mcli", + "download-etcd", + "download-grafana", + "download-prometheus", + "download-tempo", + "download-redis", ] [tasks.pre-start-playground] category = "RiseDev - Prepare" description = "Preparation steps for playground" dependencies = [ - "build-risingwave-playground", - "codesign-playground", - "build-connector-node", + "build-risingwave-playground", + "codesign-playground", + "build-connector-node", ] [tasks.check-risedev-env-file] @@ -740,16 +741,16 @@ dependencies = ["k", "clean-data"] private = true category = "RiseDev - Check" install_crate = { min_version = "0.9.51", crate_name = "cargo-nextest", binary = "cargo", test_arg = [ - "nextest", - "--help", + "nextest", + "--help", ], install_command = "binstall" } [tasks.install-llvm-cov] private = true category = "RiseDev - Check" install_crate = { min_version = "0.5.17", crate_name = "cargo-llvm-cov", binary = "cargo", test_arg = [ - "llvm-cov", - "--help", + "llvm-cov", + "--help", ], install_command = "binstall" } [tasks.install-tools] @@ -830,11 +831,11 @@ description = "🌟 Run unit tests" category = "RiseDev - Build" dependencies = ["prepare"] condition = { env_set = [ - "ENABLE_BUILD_RW_CONNECTOR", + "ENABLE_BUILD_RW_CONNECTOR", ], files_modified = { input = [ - "./java/connector-node/**/*", + "./java/connector-node/**/*", ], output = [ - "./java/connector-node/assembly/target/**/*", + "./java/connector-node/assembly/target/**/*", ] } } description = "Build RisingWave Connector from source" script = ''' @@ -1054,8 +1055,8 @@ private = true category = "RiseDev - Check" description = "Run cargo hakari check and attempt to fix" install_crate = { min_version = "0.9.24", crate_name = "cargo-hakari", binary = "cargo", test_arg = [ - "hakari", - "--help", + "hakari", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1072,8 +1073,8 @@ private = true category = "RiseDev - Check" description = "Run cargo sort check and attempt to fix" install_crate = { min_version = "1.0.9", crate_name = "cargo-sort", binary = "cargo", test_arg = [ - "sort", - "--help", + "sort", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1135,7 +1136,7 @@ private = true category = "RiseDev - Check" description = "Run cargo typos-cli check" install_crate = { min_version = "1.20.4", crate_name = "typos-cli", binary = "typos", test_arg = [ - "--help", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1151,8 +1152,8 @@ category = "RiseDev - Check" description = "Check unused dependencies" env = { RUSTFLAGS = "--cfg tokio_unstable" } install_crate = { min_version = "0.1.35", crate_name = "cargo-udeps", binary = "cargo", test_arg = [ - "udeps", - "--help", + "udeps", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1179,13 +1180,13 @@ scripts/check/check-trailing-spaces.sh --fix [tasks.check-fast] category = "RiseDev - Check" dependencies = [ - "warn-on-missing-tools", - # Disable hakari until we make sure it's useful - # "check-hakari", - "check-dep-sort", - "check-fmt", - "check-trailing-spaces", - "check-typos", + "warn-on-missing-tools", + # Disable hakari until we make sure it's useful + # "check-hakari", + "check-dep-sort", + "check-fmt", + "check-trailing-spaces", + "check-typos", ] description = "Perform part of the pre-CI checks that are fast to run" @@ -1206,10 +1207,10 @@ alias = "check" [tasks.check-fix] category = "RiseDev - Check" dependencies = [ - "warn-on-missing-tools", - "check-fast", - "check-clippy-fix", - "check-java-fix", + "warn-on-missing-tools", + "check-fast", + "check-clippy-fix", + "check-java-fix", ] script = """ #!/usr/bin/env bash @@ -1294,7 +1295,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ - "--help", + "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] command = "sqllogictest" diff --git a/ci/risedev-components.ci.benchmark.env b/ci/risedev-components.ci.benchmark.env index 7761d6ce969d0..5bb6c4abddb56 100644 --- a/ci/risedev-components.ci.benchmark.env +++ b/ci/risedev-components.ci.benchmark.env @@ -4,3 +4,4 @@ ENABLE_ETCD=true ENABLE_BUILD_RUST=true ENABLE_RELEASE_PROFILE=true ENABLE_PROMETHEUS_GRAFANA=true +RW_TELEMETRY_TYPE=test diff --git a/ci/risedev-components.ci.env b/ci/risedev-components.ci.env index 7157083eb871a..0430207739145 100644 --- a/ci/risedev-components.ci.env +++ b/ci/risedev-components.ci.env @@ -1,3 +1,4 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true +RW_TELEMETRY_TYPE=test diff --git a/ci/risedev-components.ci.source.env b/ci/risedev-components.ci.source.env index 22755d4c6f3d8..88d76b3df8e98 100644 --- a/ci/risedev-components.ci.source.env +++ b/ci/risedev-components.ci.source.env @@ -2,3 +2,4 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true ENABLE_PUBSUB=true +RW_TELEMETRY_TYPE=test diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index f1177ecf8320c..3b31afeef253d 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -14,6 +14,7 @@ export MINIO_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/minio export MCLI_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/mc export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud-cli-475.0.0-linux-x86_64.tar.gz export NEXTEST_HIDE_PROGRESS_BAR=true +export RW_TELEMETRY_TYPE=test unset LANG if [ -n "${BUILDKITE_COMMIT:-}" ]; then export GIT_SHA=$BUILDKITE_COMMIT diff --git a/e2e_test/batch/catalog/pg_class.slt.part b/e2e_test/batch/catalog/pg_class.slt.part index ffb53e32d66b5..a6fe4a1257122 100644 --- a/e2e_test/batch/catalog/pg_class.slt.part +++ b/e2e_test/batch/catalog/pg_class.slt.part @@ -20,4 +20,4 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit query ITIT SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass; ---- -2147478671 pg_namespace 1 v +2147478670 pg_namespace 1 v diff --git a/e2e_test/error_ui/simple/recovery.slt b/e2e_test/error_ui/simple/recovery.slt index b47afc94c47ad..152854c278483 100644 --- a/e2e_test/error_ui/simple/recovery.slt +++ b/e2e_test/error_ui/simple/recovery.slt @@ -1,15 +1,15 @@ -# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/12474. +# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/11915. # We should consider using a mechanism designed for testing recovery instead of depending on a bug. statement ok -create table t (v int); +create table t (v decimal); statement ok -create materialized view mv as select generate_series(1, 10), coalesce(pg_sleep(2), v) / 0 bomb from t; +create materialized view mv as select sum(coalesce(pg_sleep(1), v)) from t; -# The bomb will be triggered after 2 seconds of sleep, so the insertion should return successfully. +# The bomb will be triggered after 1 seconds of sleep, so the insertion should return successfully. statement ok -insert into t values (1); +insert into t values (4e28), (4e28); # Wait for recovery to complete. sleep 15s @@ -25,7 +25,7 @@ with error as ( limit 1 ) select -case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Division by zero%' then 'ok' +case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Numeric out of range%' then 'ok' else error end as result from error; diff --git a/e2e_test/streaming/bug_fixes/issue_12474.slt b/e2e_test/streaming/bug_fixes/issue_12474.slt new file mode 100644 index 0000000000000..245ca499a255d --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12474.slt @@ -0,0 +1,26 @@ +# https://github.com/risingwavelabs/risingwave/issues/12474 + +statement ok +create table t(x int[]); + +statement ok +create materialized view mv as select 1 / x[1] as bomb, unnest(x) as unnest from t; + +statement ok +insert into t values (array[0, 1]), (array[1]); + +statement ok +flush; + +query II rowsort +select * from mv; +---- +1 1 +NULL 0 +NULL 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index 2d50c1039743c..7784159eb6c70 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -15,13 +15,16 @@ use either::Either; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, BoxedExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::error::{BatchError, Result}; use crate::executor::{ @@ -170,6 +173,57 @@ impl BoxedExecutorBuilder for ProjectSetExecutor { } } +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(BoxedExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: BoxedExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { + Ok(match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?), + PbSelectItem::TableFunction(tf) => { + Self::Set(table_function::build_from_prost(tf, chunk_size)?) + } + }) + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>> { + match self { + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)), + } + } +} + #[cfg(test)] mod tests { use futures::stream::StreamExt; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index e2af368c24a85..e5b4a2feaa2a0 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -40,7 +40,7 @@ clap = { workspace = true } comfy-table = "7" crc32fast = "1" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" enumflags2 = { version = "0.7.8" } diff --git a/src/common/estimate_size/Cargo.toml b/src/common/estimate_size/Cargo.toml index 24cf19b3809e9..77e4203f9c7cb 100644 --- a/src/common/estimate_size/Cargo.toml +++ b/src/common/estimate_size/Cargo.toml @@ -16,7 +16,7 @@ normal = ["workspace-hack"] [dependencies] bytes = "1" -educe = "0.5" +educe = "0.6" ethnum = { version = "1", features = ["serde"] } fixedbitset = "0.5" jsonbb = { workspace = true } diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 54a88789a171f..0fbd526692da0 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -16,6 +16,7 @@ pub mod manager; pub mod pb_compatible; pub mod report; +use std::env; use std::time::SystemTime; use serde::{Deserialize, Serialize}; @@ -25,6 +26,11 @@ use thiserror_ext::AsReport; use crate::util::env_var::env_var_is_true_or; use crate::util::resource_util::cpu::total_cpu_available; use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes}; +use crate::RW_VERSION; + +pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE"; +const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud +const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test"; // test environment, eg. CI & Risedev /// Url of telemetry backend pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; @@ -159,6 +165,34 @@ pub fn current_timestamp() -> u64 { .as_secs() } +pub fn report_scarf_enabled() -> bool { + env::var(TELEMETRY_CLUSTER_TYPE) + .map(|deploy_type| { + !(deploy_type.eq_ignore_ascii_case(TELEMETRY_CLUSTER_TYPE_HOSTED) + || deploy_type.eq_ignore_ascii_case(TELEMETRY_CLUSTER_TYPE_TEST)) + }) + .unwrap_or(true) +} + +// impl logic to report to Scarf service, containing RW version and deployment platform +pub async fn report_to_scarf() { + let request_url = format!( + "https://risingwave.gateway.scarf.sh/telemetry/{}/{}", + RW_VERSION, + System::name().unwrap_or_default() + ); + // keep trying every 1h until success + loop { + let res = reqwest::get(&request_url).await; + if let Ok(res) = res { + if res.status().is_success() { + break; + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index e73fb35e63267..211f2d608c33e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,6 +34,7 @@ await-tree = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } aws-msk-iam-sasl-signer = "1.0.0" +aws-sdk-dynamodb = "1" aws-sdk-kinesis = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..2049545696216 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_common::bail; @@ -49,7 +49,7 @@ pub struct DebeziumProps { } impl DebeziumProps { - pub fn from(props: &BTreeMap) -> Self { + pub fn from(props: &HashMap) -> Self { let ignore_key = props .get(DEBEZIUM_IGNORE_KEY) .map(|v| v.eq_ignore_ascii_case("true")) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4e0549ef99a98..a76917ad5973f 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1054,7 +1054,7 @@ impl MapHandling { pub const OPTION_KEY: &'static str = "map.handling.mode"; pub fn from_options( - options: &std::collections::BTreeMap, + options: &std::collections::HashMap, ) -> Result, InvalidOptionError> { let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some("jsonb") => Self::Jsonb, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index a40b1153d5215..5576bbc2e66e1 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -70,7 +70,7 @@ impl TimestamptzHandling { pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; pub fn from_options( - options: &std::collections::BTreeMap, + options: &std::collections::HashMap, ) -> Result, InvalidOptionError> { let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some("utc_string") => Self::UtcString, diff --git a/src/connector/src/schema/loader.rs b/src/connector/src/schema/loader.rs index a50d8cced575b..3d4d9325e5918 100644 --- a/src/connector/src/schema/loader.rs +++ b/src/connector/src/schema/loader.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; @@ -38,7 +38,7 @@ pub struct SchemaLoader { impl SchemaLoader { pub fn from_format_options( topic: &str, - format_options: &BTreeMap, + format_options: &HashMap, ) -> Result { let schema_location = format_options .get(SCHEMA_REGISTRY_KEY) diff --git a/src/connector/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs index d140af83c853f..b052de359e588 100644 --- a/src/connector/src/schema/protobuf.rs +++ b/src/connector/src/schema/protobuf.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use itertools::Itertools as _; use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor}; @@ -28,7 +28,7 @@ use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties /// `aws_auth_props` is only required when reading `s3://` URL. pub async fn fetch_descriptor( - format_options: &BTreeMap, + format_options: &HashMap, topic: &str, aws_auth_props: Option<&AwsAuthProps>, ) -> Result<(MessageDescriptor, Option), SchemaFetchError> { @@ -82,7 +82,7 @@ pub async fn fetch_descriptor( pub async fn fetch_from_registry( message_name: &str, - format_options: &BTreeMap, + format_options: &HashMap, topic: &str, ) -> Result<(MessageDescriptor, i32), SchemaFetchError> { let loader = SchemaLoader::from_format_options(topic, format_options)?; diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index e5415c268d569..f64e3fbdd10c0 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use itertools::Itertools; use risingwave_common::catalog::{ @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::PbSinkDesc; use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkDesc { /// Id of the sink. For debug now. pub id: SinkId, @@ -48,7 +48,7 @@ pub struct SinkDesc { pub distribution_key: Vec, /// The properties of the sink. - pub properties: BTreeMap, + pub properties: HashMap, // The append-only behavior of the physical sink connector. Frontend will determine `sink_type` // based on both its own derivation on the append-only attribute and other user-specified @@ -136,3 +136,9 @@ impl SinkDesc { } } } + +impl std::hash::Hash for SinkDesc { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index bf5dd89dd7894..0113e484d7781 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -14,7 +14,7 @@ pub mod desc; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use anyhow::anyhow; use itertools::Itertools; @@ -114,11 +114,11 @@ impl SinkType { /// May replace [`SinkType`]. /// /// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkFormatDesc { pub format: SinkFormat, pub encode: SinkEncode, - pub options: BTreeMap, + pub options: HashMap, pub key_encode: Option, } diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs new file mode 100644 index 0000000000000..edf2e7c08cc9f --- /dev/null +++ b/src/connector/src/sink/dynamodb.rs @@ -0,0 +1,401 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::usize; + +use anyhow::{anyhow, Context}; +use aws_sdk_dynamodb as dynamodb; +use aws_sdk_dynamodb::client::Client; +use aws_smithy_types::Blob; +use dynamodb::types::{ + AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, + TableStatus, WriteRequest, +}; +use maplit::hashmap; +use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::Row as _; +use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; +use risingwave_common::util::iter_util::ZipEqDebug; +use serde_derive::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; +use with_options::WithOptions; + +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::connector_common::AwsAuthProps; +use crate::error::ConnectorResult; + +pub const DYNAMO_DB_SINK: &str = "dynamodb"; + +#[serde_as] +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct DynamoDbConfig { + #[serde(rename = "table", alias = "dynamodb.table")] + pub table: String, + + #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_rows: usize, + + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, +} + +fn default_max_batch_rows() -> usize { + 1024 +} + +impl DynamoDbConfig { + pub async fn build_client(&self) -> ConnectorResult { + let config = &self.aws_auth_props; + let aws_config = config.build_config().await?; + + Ok(Client::new(&aws_config)) + } + + fn from_hashmap(values: HashMap) -> Result { + serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e))) + } +} + +#[derive(Clone, Debug)] +pub struct DynamoDbSink { + pub config: DynamoDbConfig, + schema: Schema, + pk_indices: Vec, +} + +impl Sink for DynamoDbSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = DYNAMO_DB_SINK; + + async fn validate(&self) -> Result<()> { + let client = (self.config.build_client().await) + .context("validate DynamoDB sink error") + .map_err(SinkError::DynamoDb)?; + + let table_name = &self.config.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + if !matches!(table.table_status(), Some(TableStatus::Active)) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} is not active", + table_name + ))); + } + let pk_set: HashSet = self + .schema + .fields() + .iter() + .enumerate() + .filter(|(k, _)| self.pk_indices.contains(k)) + .map(|(_, v)| v.name.clone()) + .collect(); + let key_schema = table.key_schema(); + + for key_element in key_schema.iter().map(|x| x.attribute_name()) { + if !pk_set.contains(key_element) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} key field {} not found in schema or not primary key", + table_name, + key_element + ))); + } + } + + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok( + DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone()) + .await? + .into_log_sinker(usize::MAX), + ) + } +} + +impl TryFrom for DynamoDbSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = DynamoDbConfig::from_hashmap(param.properties)?; + + Ok(Self { + config, + schema, + pk_indices: param.downstream_pk, + }) + } +} + +#[derive(Debug)] +struct DynamoDbRequest { + inner: WriteRequest, + key_items: Vec, +} + +impl DynamoDbRequest { + fn extract_pk_values(&self) -> Option> { + let key = match (&self.inner.put_request(), &self.inner.delete_request()) { + (Some(put_req), None) => &put_req.item, + (None, Some(del_req)) => &del_req.key, + _ => return None, + }; + let vs = key + .iter() + .filter(|(k, _)| self.key_items.contains(k)) + .map(|(_, v)| v.clone()) + .collect(); + Some(vs) + } +} + +struct DynamoDbPayloadWriter { + request_items: Vec, + client: Client, + table: String, + dynamodb_keys: Vec, +} + +impl DynamoDbPayloadWriter { + fn write_one_insert(&mut self, item: HashMap) { + let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); + let req = WriteRequest::builder().put_request(put_req).build(); + self.write_one_req(req); + } + + fn write_one_delete(&mut self, key: HashMap) { + let key = key + .into_iter() + .filter(|(k, _)| self.dynamodb_keys.contains(k)) + .collect(); + let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); + let req = WriteRequest::builder().delete_request(del_req).build(); + self.write_one_req(req); + } + + fn write_one_req(&mut self, req: WriteRequest) { + let r_req = DynamoDbRequest { + inner: req, + key_items: self.dynamodb_keys.clone(), + }; + if let Some(v) = r_req.extract_pk_values() { + self.request_items.retain(|item| { + !item + .extract_pk_values() + .unwrap_or_default() + .iter() + .all(|x| v.contains(x)) + }); + } + self.request_items.push(r_req); + } + + async fn write_chunk(&mut self) -> Result<()> { + if !self.request_items.is_empty() { + let table = self.table.clone(); + let req_items = std::mem::take(&mut self.request_items) + .into_iter() + .map(|r| r.inner) + .collect(); + let reqs = hashmap! { + table => req_items, + }; + self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send() + .await + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + })?; + } + + Ok(()) + } +} + +pub struct DynamoDbSinkWriter { + max_batch_rows: usize, + payload_writer: DynamoDbPayloadWriter, + formatter: DynamoDbFormatter, +} + +impl DynamoDbSinkWriter { + pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { + let client = config.build_client().await?; + let table_name = &config.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + let dynamodb_keys = table + .key_schema + .unwrap_or_default() + .into_iter() + .map(|k| k.attribute_name) + .collect(); + + let payload_writer = DynamoDbPayloadWriter { + request_items: Vec::new(), + client, + table: config.table, + dynamodb_keys, + }; + + Ok(Self { + max_batch_rows: config.max_batch_rows, + payload_writer, + formatter: DynamoDbFormatter { schema }, + }) + } + + async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + let items = self.formatter.format_row(row)?; + match op { + Op::Insert | Op::UpdateInsert => { + self.payload_writer.write_one_insert(items); + } + Op::Delete => { + self.payload_writer.write_one_delete(items); + } + Op::UpdateDelete => {} + } + } + if self.payload_writer.request_items.len() >= self.max_batch_rows { + self.payload_writer.write_chunk().await?; + } + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { + self.payload_writer.write_chunk().await + } +} + +impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + self.write_chunk_inner(chunk).await + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + self.flush().await?; + } + Ok(()) + } +} + +struct DynamoDbFormatter { + schema: Schema, +} + +impl DynamoDbFormatter { + fn format_row(&self, row: RowRef<'_>) -> Result> { + row.iter() + .zip_eq_debug((self.schema.clone()).into_fields()) + .map(|(scalar, field)| { + map_data_type(scalar, &field.data_type()).map(|attr| (field.name, attr)) + }) + .collect() + } +} + +fn map_data_type( + scalar_ref: Option>, + data_type: &DataType, +) -> Result { + let Some(scalar_ref) = scalar_ref else { + return Ok(AttributeValue::Null(true)); + }; + let attr = match data_type { + DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int256 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal + | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)), + // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699) + DataType::Varchar + | DataType::Interval + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)), + DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()), + DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())), + DataType::List(datatype) => { + let list_attr = scalar_ref + .into_list() + .iter() + .map(|x| map_data_type(x, datatype)) + .collect::>>()?; + AttributeValue::L(list_attr) + } + DataType::Struct(st) => { + let mut map = HashMap::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in + scalar_ref.into_struct().iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) + { + let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; + map.insert(sub_field.name.clone(), attr); + } + AttributeValue::M(map) + } + }; + Ok(attr) +} diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index a0515bb7bc165..579d90a5b5b26 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use risingwave_common::catalog::Schema; @@ -117,7 +117,7 @@ impl TimestamptzHandlingMode { pub const FRONTEND_DEFAULT: &'static str = "utc_string"; pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; - pub fn from_options(options: &BTreeMap) -> Result { + pub fn from_options(options: &HashMap) -> Result { match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString), Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix), diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 8313ea20dafa7..09fe39a4865c8 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -264,8 +264,6 @@ impl KafkaConfig { pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.rdkafka_properties_common.set_client(c); self.rdkafka_properties_producer.set_client(c); - - tracing::info!("kafka client starts with: {:?}", c); } } diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 3b5533a49c76c..6f534c2800a53 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -15,9 +15,11 @@ use std::collections::HashMap; use anyhow::{anyhow, Context}; -use aws_sdk_kinesis::operation::put_record::PutRecordOutput; +use aws_sdk_kinesis::operation::put_records::builders::PutRecordsFluentBuilder; use aws_sdk_kinesis::primitives::Blob; +use aws_sdk_kinesis::types::PutRecordsRequestEntry; use aws_sdk_kinesis::Client as KinesisClient; +use futures::{FutureExt, TryFuture}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::session_config::sink_decouple::SinkDecouple; @@ -70,6 +72,8 @@ impl TryFrom for KinesisSink { } } +const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64; + impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = AsyncTruncateLogSinkerOf; @@ -125,7 +129,7 @@ impl Sink for KinesisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(usize::MAX)) + .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM)) } } @@ -148,12 +152,13 @@ impl KinesisSinkConfig { pub struct KinesisSinkWriter { pub config: KinesisSinkConfig, formatter: SinkFormatterImpl, - payload_writer: KinesisSinkPayloadWriter, + client: KinesisClient, } struct KinesisSinkPayloadWriter { - client: KinesisClient, - config: KinesisSinkConfig, + // builder should always be `Some`. Making it an option so that we can call + // builder methods that take the builder ownership as input and return with a new builder. + builder: Option, } impl KinesisSinkWriter { @@ -182,29 +187,57 @@ impl KinesisSinkWriter { Ok(Self { config: config.clone(), formatter, - payload_writer: KinesisSinkPayloadWriter { client, config }, + client, }) } + + fn new_payload_writer(&self) -> KinesisSinkPayloadWriter { + let builder = self + .client + .put_records() + .stream_name(&self.config.common.stream_name); + KinesisSinkPayloadWriter { + builder: Some(builder), + } + } } + +pub type KinesisSinkPayloadWriterDeliveryFuture = + impl TryFuture + Unpin + Send + 'static; + impl KinesisSinkPayloadWriter { - async fn put_record(&self, key: &str, payload: Vec) -> Result { - let payload = Blob::new(payload); - // todo: switch to put_records() for batching - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || async { - self.client - .put_record() - .stream_name(&self.config.common.stream_name) + fn put_record(&mut self, key: String, payload: Vec) { + self.builder = Some( + self.builder.take().expect("should not be None").records( + PutRecordsRequestEntry::builder() .partition_key(key) - .data(payload.clone()) - .send() - .await - }, - ) - .await - .with_context(|| format!("failed to put record to {}", self.config.common.stream_name)) - .map_err(SinkError::Kinesis) + .data(Blob::new(payload)) + .build() + .expect("should not fail because we have set `data` and `partition_key`"), + ), + ); + } + + fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { + async move { + let builder = self.builder.expect("should not be None"); + let context_fmt = format!( + "failed to put record to {}", + builder + .get_stream_name() + .as_ref() + .expect("should have set stream name") + ); + Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || builder.clone().send(), + ) + .await + .with_context(|| context_fmt.clone()) + .map_err(SinkError::Kinesis)?; + Ok(()) + } + .boxed() } } @@ -214,24 +247,46 @@ impl FormattedSink for KinesisSinkPayloadWriter { async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { self.put_record( - &k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?, + k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?, v.unwrap_or_default(), - ) - .await - .map(|_| ()) + ); + Ok(()) } } impl AsyncTruncateSinkWriter for KinesisSinkWriter { + type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { + let mut payload_writer = self.new_payload_writer(); dispatch_sink_formatter_str_key_impl!( &self.formatter, formatter, - self.payload_writer.write_chunk(chunk, formatter).await - ) + payload_writer.write_chunk(chunk, formatter).await + )?; + + add_future + .add_future_may_await(payload_writer.finish()) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use aws_sdk_kinesis::types::PutRecordsRequestEntry; + use aws_smithy_types::Blob; + + #[test] + fn test_kinesis_entry_builder_save_unwrap() { + PutRecordsRequestEntry::builder() + .data(Blob::new(b"data")) + .partition_key("partition-key") + .build() + .unwrap(); } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6993988709795..325e794137ae1 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -21,6 +21,7 @@ pub mod decouple_checkpoint_log_sink; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; +pub mod dynamodb; pub mod elasticsearch; pub mod encoder; pub mod formatter; @@ -98,6 +99,7 @@ macro_rules! for_all_sinks { { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, + { DynamoDb, $crate::sink::dynamodb::DynamoDbSink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::trivial::TableSink } } @@ -569,6 +571,12 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), + #[error("DynamoDB error: {0}")] + DynamoDb( + #[source] + #[backtrace] + anyhow::Error, + ), #[error(transparent)] Connector( #[from] diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 6baeccdd32ed8..4b3733be22d1f 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -375,7 +375,6 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { #[cfg(test)] mod test { use core::panic; - use std::collections::BTreeMap; use rdkafka::message::FromBytes; use risingwave_common::array::{Array, I32Array, Op, Utf8Array}; @@ -407,7 +406,7 @@ mod test { let format_desc = SinkFormatDesc { format: SinkFormat::AppendOnly, encode: SinkEncode::Json, - options: BTreeMap::default(), + options: HashMap::default(), key_encode: None, }; @@ -475,16 +474,16 @@ mod test { }, ]); - let mut btree_map = BTreeMap::default(); - btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); - btree_map.insert( + let mut hash_map = HashMap::default(); + hash_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); + hash_map.insert( VALUE_FORMAT.to_string(), "values:{id:{id},name:{name}}".to_string(), ); let format_desc = SinkFormatDesc { format: SinkFormat::AppendOnly, encode: SinkEncode::Template, - options: btree_map, + options: hash_map, key_encode: None, }; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 63fc60ecc4510..ae245d2ef2363 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -666,11 +666,7 @@ impl RemoteCoordinator { .start_sink_coordinator_stream(param.clone()) .await?; - tracing::trace!( - "{:?} RemoteCoordinator started with properties: {:?}", - R::SINK_NAME, - ¶m.properties - ); + tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,); Ok(RemoteCoordinator { stream_handle }) } diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 4c560d30ecce6..a037dd4463ecb 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -163,8 +163,6 @@ impl KafkaProperties { pub fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.rdkafka_properties_common.set_client(c); self.rdkafka_properties_consumer.set_client(c); - - tracing::info!("kafka client starts with: {:?}", c); } } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 348749ba3f113..1d395bbba4943 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -91,7 +91,7 @@ impl BrokerAddrRewriter { } #[inline(always)] -fn kafka_props_broker_key(with_properties: &BTreeMap) -> &str { +fn kafka_props_broker_key(with_properties: &HashMap) -> &str { if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) { KAFKA_PROPS_BROKER_KEY } else { @@ -101,7 +101,7 @@ fn kafka_props_broker_key(with_properties: &BTreeMap) -> &str { #[inline(always)] fn get_property_required( - with_properties: &BTreeMap, + with_properties: &HashMap, property: &str, ) -> ConnectorResult { with_properties @@ -112,7 +112,7 @@ fn get_property_required( } pub fn insert_privatelink_broker_rewrite_map( - with_options: &mut BTreeMap, + with_options: &mut HashMap, svc: Option<&PrivateLinkService>, privatelink_endpoint: Option, ) -> ConnectorResult<()> { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index eb0ba30d12431..a8c1abb40cd55 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -165,7 +165,7 @@ impl SourceReader { } else { let to_reader_splits = splits.into_iter().map(|split| vec![split]); try_join_all(to_reader_splits.into_iter().map(|splits| { - tracing::debug!(?splits, ?prop, "spawning connector split reader"); + tracing::debug!(?splits, "spawning connector split reader"); let props = prop.clone(); let data_gen_columns = data_gen_columns.clone(); let parser_config = parser_config.clone(); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index a879cd74f1c43..eb43d51f1a759 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -134,6 +134,60 @@ DorisConfig: - name: r#type field_type: String required: true +DynamoDbConfig: + fields: + - name: table + field_type: String + required: true + alias: + - dynamodb.table + - name: dynamodb.max_batch_rows + field_type: usize + required: false + default: '1024' + - name: aws.region + field_type: String + required: false + alias: + - region + - name: aws.endpoint_url + field_type: String + required: false + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id + field_type: String + required: false + alias: + - access_key + - name: aws.credentials.secret_access_key + field_type: String + required: false + alias: + - secret_key + - name: aws.credentials.session_token + field_type: String + required: false + alias: + - session_token + - name: aws.credentials.role.arn + field_type: String + comments: IAM role + required: false + alias: + - arn + - name: aws.credentials.role.external_id + field_type: String + comments: external ID in IAM role trust policy + required: false + alias: + - external_id + - name: aws.profile + field_type: String + required: false + alias: + - profile GooglePubSubConfig: fields: - name: pubsub.project_id diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 38aacbb303c22..cbff3a5ff2e28 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -30,7 +30,7 @@ chrono = { version = "0.4", default-features = false, features = [ const-currying = "0.0.4" downcast-rs = "1.2" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" futures = "0.3" diff --git a/src/expr/core/src/table_function/mod.rs b/src/expr/core/src/table_function/mod.rs index b87d6f020b093..c14a50a8f41a4 100644 --- a/src/expr/core/src/table_function/mod.rs +++ b/src/expr/core/src/table_function/mod.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use futures_async_stream::try_stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; -use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk}; +use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk}; use risingwave_common::types::{DataType, DatumRef}; -use risingwave_pb::expr::project_set_select_item::SelectItem; use risingwave_pb::expr::table_function::PbType; -use risingwave_pb::expr::{PbProjectSetSelectItem, PbTableFunction}; +use risingwave_pb::expr::PbTableFunction; use super::{ExprError, Result}; use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression}; @@ -141,53 +139,6 @@ pub fn build( desc.build_table(return_type, chunk_size, children) } -/// See also [`PbProjectSetSelectItem`] -#[derive(Debug)] -pub enum ProjectSetSelectItem { - TableFunction(BoxedTableFunction), - Expr(BoxedExpression), -} - -impl From for ProjectSetSelectItem { - fn from(table_function: BoxedTableFunction) -> Self { - ProjectSetSelectItem::TableFunction(table_function) - } -} - -impl From for ProjectSetSelectItem { - fn from(expr: BoxedExpression) -> Self { - ProjectSetSelectItem::Expr(expr) - } -} - -impl ProjectSetSelectItem { - pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { - match prost.select_item.as_ref().unwrap() { - SelectItem::Expr(expr) => expr_build_from_prost(expr).map(Into::into), - SelectItem::TableFunction(tf) => build_from_prost(tf, chunk_size).map(Into::into), - } - } - - pub fn return_type(&self) -> DataType { - match self { - ProjectSetSelectItem::TableFunction(tf) => tf.return_type(), - ProjectSetSelectItem::Expr(expr) => expr.return_type(), - } - } - - pub async fn eval<'a>( - &'a self, - input: &'a DataChunk, - ) -> Result, ArrayRef>> { - match self { - Self::TableFunction(tf) => Ok(Either::Left( - TableFunctionOutputIter::new(tf.eval(input).await).await?, - )), - Self::Expr(expr) => expr.eval(input).await.map(Either::Right), - } - } -} - /// A wrapper over the output of table function that allows iteration by rows. /// /// If the table function returns multiple columns, the output will be struct values. diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 9e94e3395e761..6dfbcc5905750 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -40,7 +40,7 @@ chrono = { version = "0.4", default-features = false, features = [ "std", ] } chrono-tz = { version = "0.9", features = ["case-insensitive"] } -educe = "0.5" +educe = "0.6" fancy-regex = "0.13" futures-async-stream = { workspace = true } futures-util = "0.3" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 4adeaae423ca6..feb4aa596c55f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -29,7 +29,7 @@ clap = { workspace = true } downcast-rs = "1.2" dyn-clone = "1.0.14" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" fancy-regex = "0.13.0" diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index a1c014161988e..14d6d59885b40 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -73,11 +73,11 @@ └─LogicalProject { exprs: [t.x, t._row_id] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 stream_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 - id: lead with offset argument and empty over clause sql: | create table t(x int); @@ -88,11 +88,11 @@ └─LogicalProject { exprs: [t.x, t._row_id, 2:Int32] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 stream_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 - id: lead with non-const offset argument and empty over clause sql: | create table t(x int); diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 54e1210979fe8..ba060f17cb00f 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; @@ -67,7 +67,7 @@ impl OwnedByUserCatalog for ConnectionCatalog { pub(crate) fn resolve_private_link_connection( connection: &Arc, - properties: &mut BTreeMap, + properties: &mut HashMap, ) -> Result<()> { #[allow(irrefutable_let_patterns)] if let connection::Info::PrivateLinkService(svc) = &connection.info { diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index 17292b1324ed2..68f543f854cff 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_common::catalog::{ColumnCatalog, SourceVersionId}; use risingwave_common::util::epoch::Epoch; @@ -26,7 +26,7 @@ use crate::user::UserId; /// This struct `SourceCatalog` is used in frontend. /// Compared with `PbSource`, it only maintains information used during optimization. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct SourceCatalog { pub id: SourceId, pub name: String, @@ -36,7 +36,7 @@ pub struct SourceCatalog { pub owner: UserId, pub info: StreamSourceInfo, pub row_id_index: Option, - pub with_properties: BTreeMap, + pub with_properties: HashMap, pub watermark_descs: Vec, pub associated_table_id: Option, pub definition: String, @@ -149,3 +149,9 @@ impl OwnedByUserCatalog for SourceCatalog { self.owner } } + +impl std::hash::Hash for SourceCatalog { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 6efb31614a922..36a5a71a0e9be 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -13,7 +13,6 @@ // limitations under the License. use core::str::FromStr; -use std::collections::BTreeMap; use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER}; use risingwave_common::types::Interval; @@ -24,6 +23,7 @@ use thiserror_ext::AsReport; use super::OwnedByUserCatalog; use crate::error::{ErrorCode, Result}; +use crate::WithOptions; #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(test, derive(Default))] @@ -82,7 +82,7 @@ impl SubscriptionId { } impl SubscriptionCatalog { - pub fn set_retention_seconds(&mut self, properties: BTreeMap) -> Result<()> { + pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> { let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs index de1fe4924642f..3d1c8f2a56f74 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs @@ -29,7 +29,6 @@ mod pg_extension; mod pg_index; mod pg_indexes; mod pg_inherits; -mod pg_keywords; mod pg_language; mod pg_locks; mod pg_matviews; diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs deleted file mode 100644 index a859527afa6df..0000000000000 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// The code is same as `expr/impl/src/table_function/pg_get_keywords.rs`. - -use risingwave_common::types::Fields; -use risingwave_frontend_macro::system_catalog; -use risingwave_sqlparser::keywords::{ - ALL_KEYWORDS_INDEX, RESERVED_FOR_COLUMN_ALIAS, RESERVED_FOR_COLUMN_OR_TABLE_NAME, -}; - -use crate::catalog::system_catalog::SysCatalogReaderImpl; - -/// The catalog `pg_keywords` stores keywords. `pg_get_keywords` returns the content of this table. -/// Ref: [`https://www.postgresql.org/docs/15/functions-info.html`] -/// -/// # Example -/// -/// ```slt -/// query TTT -/// select * from pg_keywords where word = 'add'; -/// ---- -/// add U unreserved -/// ``` -#[derive(Fields)] -struct PgKeywords { - #[primary_key] - word: String, - catcode: char, - catdesc: &'static str, -} - -#[system_catalog(table, "pg_catalog.pg_keywords")] -fn read_pg_keywords(_reader: &SysCatalogReaderImpl) -> Vec { - ALL_KEYWORDS_INDEX - .iter() - .map(|keyword| { - // FIXME: The current category is not correct. Many are different from the PostgreSQL. - let catcode = if !RESERVED_FOR_COLUMN_OR_TABLE_NAME.contains(keyword) { - 'U' - } else if !RESERVED_FOR_COLUMN_ALIAS.contains(keyword) { - 'C' - } else { - 'R' - }; - let catdesc = match catcode { - 'U' => "unreserved", - 'C' => "unreserved (cannot be function or type name)", - 'T' => "reserved (can be function or type name)", - 'R' => "reserved", - _ => unreachable!(), - }; - PgKeywords { - word: keyword.to_string().to_lowercase(), - catcode, - catdesc, - } - }) - .collect() -} diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 9f3c089998fbc..7e509324bf1ce 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -148,11 +148,8 @@ pub async fn refresh_sr_and_get_columns_diff( connector_schema: &ConnectorSchema, session: &Arc, ) -> Result<(StreamSourceInfo, Vec, Vec)> { - let mut with_properties = original_source - .with_properties - .clone() - .into_iter() - .collect(); + let mut with_properties = original_source.with_properties.clone(); + validate_compatibility(connector_schema, &mut with_properties)?; if with_properties.is_cdc_connector() { diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 22491f9cb0ee3..1a8af68ddb881 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -126,7 +126,7 @@ pub async fn handle_create_connection( let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; let with_properties = handler_args.with_options.clone().into_connector_props(); - let create_connection_payload = resolve_create_connection_payload(&with_properties)?; + let create_connection_payload = resolve_create_connection_payload(with_properties.inner())?; let catalog_writer = session.catalog_writer()?; catalog_writer diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b0c00a2c862db..ecd61c3aa49d0 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -792,6 +792,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { options .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned()) .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned()); + let options = options.into_iter().collect(); Ok(SinkFormatDesc { format, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 75e188c086947..9232b45ae228e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::rc::Rc; use std::sync::LazyLock; @@ -90,7 +90,7 @@ pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; async fn extract_json_table_schema( schema_config: &Option<(AstString, bool)>, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result>> { match schema_config { None => Ok(None), @@ -140,7 +140,7 @@ fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool async fn extract_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, is_debezium: bool, ) -> Result> { let parser_config = SpecificParserConfig::new(info, with_properties)?; @@ -186,7 +186,7 @@ async fn extract_debezium_avro_table_pk_columns( async fn extract_protobuf_table_schema( schema: &ProtobufSchema, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result> { let info = StreamSourceInfo { proto_message_name: schema.message_name.0.clone(), @@ -224,14 +224,14 @@ fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec { } fn try_consume_string_from_options( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, key: &str, ) -> Option { format_encode_options.remove(key).map(AstString) } fn consume_string_from_options( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, key: &str, ) -> Result { try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError( @@ -239,12 +239,12 @@ fn consume_string_from_options( ))) } -fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap) { +fn consume_aws_config_from_options(format_encode_options: &mut HashMap) { format_encode_options.retain(|key, _| !key.starts_with("aws.")) } pub fn get_json_schema_location( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result> { let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); @@ -259,7 +259,7 @@ pub fn get_json_schema_location( } fn get_schema_location( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result<(AstString, bool)> { let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); @@ -300,13 +300,13 @@ pub(crate) async fn bind_columns_from_source( let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); let mut format_encode_options_to_consume = format_encode_options.clone(); - fn get_key_message_name(options: &mut BTreeMap) -> Option { + fn get_key_message_name(options: &mut HashMap) -> Option { consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) .map(|ele| Some(ele.0)) .unwrap_or(None) } fn get_sr_name_strategy_check( - options: &mut BTreeMap, + options: &mut HashMap, use_sr: bool, ) -> Result> { let name_strategy = get_name_strategy_or_default(try_consume_string_from_options( @@ -1292,7 +1292,7 @@ pub fn bind_connector_props( handler_args: &HandlerArgs, source_schema: &ConnectorSchema, is_create_source: bool, -) -> Result> { +) -> Result { let mut with_properties = handler_args.with_options.clone().into_connector_props(); validate_compatibility(source_schema, &mut with_properties)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); @@ -1319,7 +1319,7 @@ pub async fn bind_create_source( handler_args: HandlerArgs, full_name: ObjectName, source_schema: ConnectorSchema, - with_properties: HashMap, + with_properties: WithOptions, sql_columns_defs: &[ColumnDef], constraints: Vec, wildcard_idx: Option, @@ -1420,7 +1420,7 @@ pub async fn bind_create_source( check_source_schema(&with_properties, row_id_index, &columns).await?; // resolve privatelink connection for Kafka - let mut with_properties = WithOptions::new(with_properties); + let mut with_properties = with_properties; let connection_id = resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?; let _secret_ref = resolve_secret_in_with_options(&mut with_properties, session)?; @@ -1485,7 +1485,7 @@ pub async fn handle_create_source( let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? } else { - bind_columns_from_source(&session, &source_schema, &with_properties).await? + bind_columns_from_source(&session, &source_schema, with_properties.inner()).await? }; if is_shared { // Note: this field should be called is_shared. Check field doc for more details. diff --git a/src/frontend/src/handler/create_subscription.rs b/src/frontend/src/handler/create_subscription.rs index 7da1a9d1683ed..8d4ed82cc82ee 100644 --- a/src/frontend/src/handler/create_subscription.rs +++ b/src/frontend/src/handler/create_subscription.rs @@ -64,7 +64,7 @@ pub fn create_subscription_catalog( initialized_at_cluster_version: None, }; - subscription_catalog.set_retention_seconds(context.with_options().clone().into_inner())?; + subscription_catalog.set_retention_seconds(context.with_options())?; Ok(subscription_catalog) } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 38829a16be11e..99d6d9ed9ddd0 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -481,7 +481,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let with_properties = bind_connector_props(&handler_args, &source_schema, false)?; let (columns_from_resolve_source, source_info) = - bind_columns_from_source(session, &source_schema, &with_properties).await?; + bind_columns_from_source(session, &source_schema, with_properties.inner()).await?; let (source_catalog, database_id, schema_id) = bind_create_source( handler_args.clone(), diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index b5762a224d180..cf130b57bac94 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -604,6 +604,16 @@ impl PredicatePushdown for LogicalOverWindow { } } +macro_rules! empty_partition_by_not_implemented { + () => { + bail_not_implemented!( + issue = 11505, + "Window function with empty PARTITION BY is not supported because of potential bad performance. \ + If you really need this, please workaround with something like `PARTITION BY 1::int`." + ) + }; +} + impl ToBatch for LogicalOverWindow { fn to_batch(&self) -> Result { assert!( @@ -619,7 +629,7 @@ impl ToBatch for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!("Window function with empty PARTITION BY is not supported yet"); + empty_partition_by_not_implemented!(); } let input = self.input().to_batch()?; @@ -670,9 +680,7 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!( - "Window function with empty PARTITION BY is not supported yet" - ); + empty_partition_by_not_implemented!(); } let sort_input = @@ -694,9 +702,7 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!( - "Window function with empty PARTITION BY is not supported yet" - ); + empty_partition_by_not_implemented!(); } let new_input = diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 689bf8cba7e63..9c76bfdece25d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -449,7 +449,7 @@ impl StreamSink { let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc { Some(f) => ( f.format == SinkFormat::AppendOnly, - Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, + Self::is_user_force_append_only(&WithOptions::new(f.options.clone()))?, false, ), None => ( diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 3ee50276e5d10..d5d380677154b 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::num::NonZeroU32; use risingwave_connector::source::kafka::private_link::{ @@ -21,7 +21,7 @@ use risingwave_connector::source::kafka::private_link::{ use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{ CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, - CreateSubscriptionStatement, SqlOption, Statement, Value, + CreateSubscriptionStatement, ObjectName, SqlOption, Statement, Value, }; use super::OverwriteOptions; @@ -36,13 +36,14 @@ mod options { } /// Options or properties extracted from the `WITH` clause of DDLs. -#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct WithOptions { - inner: BTreeMap, + inner: HashMap, + ref_secret: HashMap, } impl std::ops::Deref for WithOptions { - type Target = BTreeMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.inner @@ -60,33 +61,42 @@ impl WithOptions { pub fn new(inner: HashMap) -> Self { Self { inner: inner.into_iter().collect(), + ref_secret: Default::default(), } } - pub fn from_inner(inner: BTreeMap) -> Self { - Self { inner } + pub fn from_inner(inner: HashMap) -> Self { + Self { + inner, + ref_secret: Default::default(), + } } /// Get the reference of the inner map. - pub fn inner(&self) -> &BTreeMap { + pub fn inner(&self) -> &HashMap { &self.inner } - pub fn inner_mut(&mut self) -> &mut BTreeMap { + pub fn inner_mut(&mut self) -> &mut HashMap { &mut self.inner } /// Take the value of the inner map. - pub fn into_inner(self) -> BTreeMap { + pub fn into_inner(self) -> HashMap { self.inner } /// Convert to connector props, remove the key-value pairs used in the top-level. - pub fn into_connector_props(self) -> HashMap { - self.inner + pub fn into_connector_props(self) -> Self { + let inner = self + .inner .into_iter() .filter(|(key, _)| key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY) - .collect() + .collect(); + Self { + inner, + ref_secret: self.ref_secret, + } } /// Parse the retention seconds from the options. @@ -107,7 +117,10 @@ impl WithOptions { }) .collect(); - Self { inner } + Self { + inner, + ref_secret: Default::default(), + } } pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool { @@ -174,9 +187,19 @@ impl TryFrom<&[SqlOption]> for WithOptions { type Error = RwError; fn try_from(options: &[SqlOption]) -> Result { - let mut inner: BTreeMap = BTreeMap::new(); + let mut inner: HashMap = HashMap::new(); + let mut ref_secret: HashMap = HashMap::new(); for option in options { let key = option.name.real_value(); + if let Value::Ref(r) = &option.value { + if ref_secret.insert(key.clone(), r.clone()).is_some() || inner.contains_key(&key) { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; + } let value: String = match option.value.clone() { Value::CstyleEscapedString(s) => s.value, Value::SingleQuotedString(s) => s, @@ -189,7 +212,7 @@ impl TryFrom<&[SqlOption]> for WithOptions { ))) } }; - if inner.insert(key.clone(), value).is_some() { + if inner.insert(key.clone(), value).is_some() || ref_secret.contains_key(&key) { return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( "Duplicated option: {}", key @@ -197,7 +220,7 @@ impl TryFrom<&[SqlOption]> for WithOptions { } } - Ok(Self { inner }) + Ok(Self { inner, ref_secret }) } } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bf5bb72ed731b..43e564271702a 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -25,7 +25,7 @@ use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; -use risingwave_common::telemetry::telemetry_env_enabled; +use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_meta::barrier::StreamRpcManager; @@ -754,6 +754,11 @@ pub async fn start_service_as_election_leader( } else { tracing::info!("Telemetry didn't start due to meta backend or config"); } + if report_scarf_enabled() { + tokio::spawn(report_to_scarf()); + } else { + tracing::info!("Scarf reporting is disabled"); + }; if let Some(pair) = env.event_log_manager_ref().take_join_handle() { sub_tasks.push(pair); diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index b77b751b281e7..2bd2bf256d803 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; use std::sync::LazyLock; use async_trait::async_trait; @@ -75,7 +74,7 @@ impl CloudService for CloudServiceImpl { "unexpected source type, only kafka source is supported", )); } - let mut source_cfg: BTreeMap = req.source_config.into_iter().collect(); + let mut source_cfg = req.source_config.clone(); // if connection_id provided, check whether endpoint service is available and resolve // broker rewrite map currently only support aws privatelink connection if let Some(connection_id_str) = source_cfg.get("connection.id") { @@ -147,7 +146,6 @@ impl CloudService for CloudServiceImpl { } } // try fetch kafka metadata, return error message on failure - let source_cfg: HashMap = source_cfg.into_iter().collect(); let props = ConnectorProperties::extract(source_cfg, false); if let Err(e) = props { return Ok(new_rwc_validate_fail_response( diff --git a/src/prost/build.rs b/src/prost/build.rs index 67284d844cc3e..5f36e26f4bcbc 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -65,7 +65,7 @@ fn main() -> Result<(), Box> { ".monitor_service.StackTraceResponse", ".plan_common.ExternalTableDesc", ".hummock.CompactTask", - ".catalog.StreamSourceInfo", + // ".catalog.StreamSourceInfo", ]; // Build protobuf structs. @@ -99,7 +99,7 @@ fn main() -> Result<(), Box> { // Eq + Hash are for plan nodes to do common sub-plan detection. // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") - .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]") diff --git a/src/sqlparser/Cargo.toml b/src/sqlparser/Cargo.toml index 377b2e676d6a8..bf6307e3ace68 100644 --- a/src/sqlparser/Cargo.toml +++ b/src/sqlparser/Cargo.toml @@ -32,7 +32,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tracing = "0.1" tracing-subscriber = "0.3" -winnow = "0.6.9" +winnow = "0.6.11" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/sqlparser/src/ast/value.rs b/src/sqlparser/src/ast/value.rs index 2ce52f3c18bf9..79f2a6ebd99ca 100644 --- a/src/sqlparser/src/ast/value.rs +++ b/src/sqlparser/src/ast/value.rs @@ -17,6 +17,8 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use super::ObjectName; + /// Primitive SQL values such as number and string #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -57,6 +59,8 @@ pub enum Value { }, /// `NULL` value Null, + /// name of the reference to secret + Ref(ObjectName), } impl fmt::Display for Value { @@ -111,6 +115,7 @@ impl fmt::Display for Value { Ok(()) } Value::Null => write!(f, "NULL"), + Value::Ref(v) => write!(f, "ref secret {}", v), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index c80c3c145dd96..71a6aa1842dbd 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -30,7 +30,7 @@ use winnow::{PResult, Parser as _}; use crate::ast::*; use crate::keywords::{self, Keyword}; use crate::parser_v2; -use crate::parser_v2::{keyword, literal_i64, literal_uint, ParserExt as _}; +use crate::parser_v2::{keyword, literal_i64, literal_uint, single_quoted_string, ParserExt as _}; use crate::tokenizer::*; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -3558,6 +3558,10 @@ impl Parser<'_> { Some('\'') => Ok(Value::SingleQuotedString(w.value)), _ => self.expected_at(checkpoint, "A value")?, }, + Keyword::REF => { + self.expect_keyword(Keyword::SECRET)?; + Ok(Value::Ref(self.parse_object_name()?)) + } _ => self.expected_at(checkpoint, "a concrete value"), }, Token::Number(ref n) => Ok(Value::Number(n.clone())), @@ -3718,25 +3722,31 @@ impl Parser<'_> { alt(( preceded( (Keyword::SYSTEM_TIME, Keyword::AS, Keyword::OF), - alt(( - ( - Self::parse_identifier.verify(|ident| { - ident.real_value() == "proctime" || ident.real_value() == "now" - }), - Token::LParen, - Token::RParen, - ) - .value(AsOf::ProcessTime), - literal_i64.map(AsOf::VersionNum), - Self::parse_literal_string.map(AsOf::TimestampString), - )), + cut_err( + alt(( + ( + Self::parse_identifier.verify(|ident| { + ident.real_value() == "proctime" || ident.real_value() == "now" + }), + cut_err(Token::LParen), + cut_err(Token::RParen), + ) + .value(AsOf::ProcessTime), + literal_i64.map(AsOf::TimestampNum), + single_quoted_string.map(AsOf::TimestampString), + )) + .expect("proctime(), now(), number or string"), + ), ), preceded( (Keyword::SYSTEM_VERSION, Keyword::AS, Keyword::OF), - alt(( - literal_i64.map(AsOf::VersionNum), - Self::parse_literal_string.map(AsOf::VersionString), - )), + cut_err( + alt(( + literal_i64.map(AsOf::VersionNum), + single_quoted_string.map(AsOf::VersionString), + )) + .expect("number or string"), + ), ), )) .parse_next(self) diff --git a/src/sqlparser/src/parser_v2/mod.rs b/src/sqlparser/src/parser_v2/mod.rs index 366b17fb23bf5..729b56a51253a 100644 --- a/src/sqlparser/src/parser_v2/mod.rs +++ b/src/sqlparser/src/parser_v2/mod.rs @@ -116,6 +116,19 @@ where .parse_next(input) } +/// Consume an 'single-quoted string'. +pub fn single_quoted_string(input: &mut S) -> PResult +where + S: TokenStream, +{ + token + .verify_map(|t| match &t.token { + Token::SingleQuotedString(s) => Some(s.clone()), + _ => None, + }) + .parse_next(input) +} + /// Consume an object name. /// /// FIXME: Object name is extremely complex, we only handle a subset here. diff --git a/src/sqlparser/tests/testdata/as_of.yaml b/src/sqlparser/tests/testdata/as_of.yaml new file mode 100644 index 0000000000000..2d7c81881988b --- /dev/null +++ b/src/sqlparser/tests/testdata/as_of.yaml @@ -0,0 +1,23 @@ +# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`. +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF PROCTIME() ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF NOW() on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF PROCTIME() ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF 1 on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF 1 ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF 'string' on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF 'string' ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME on a1 = a2; + error_msg: |- + sql parser error: expected proctime(), now(), number or string + LINE 1: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME on a1 = a2; + ^ +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF 1 on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_VERSION AS OF 1 ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF 'string' on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_VERSION AS OF 'string' ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF PROCTIME() on a1 = a2; + error_msg: |- + sql parser error: expected number or string + LINE 1: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF PROCTIME() on a1 = a2; + ^ diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index 1179c6a26ac02..ab348571a4128 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -15,6 +15,7 @@ use std::ops::Bound::*; use bytes::Bytes; +use more_asserts::debug_assert_le; use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; @@ -212,28 +213,34 @@ impl> BackwardUserIterator { pub async fn rewind(&mut self) -> HummockResult<()> { // Handle range scan match &self.key_range.1 { - Included(end_key) => { + Included(end_key) | Excluded(end_key) => { let full_key = FullKey { - user_key: end_key.clone(), + user_key: end_key.as_ref(), epoch_with_gap: EpochWithGap::new_min_epoch(), }; - self.iterator.seek(full_key.to_ref()).await?; + self.iterator.seek(full_key).await?; } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => self.iterator.rewind().await?, }; // Handle multi-version self.reset(); // Handle range scan when key < begin_key - self.next().await + self.next().await?; + if let Excluded(end_key) = &self.key_range.1 + && self.is_valid() + && self.key().user_key == end_key.as_ref() + { + self.next().await?; + } + Ok(()) } /// Resets the iterating position to the first position where the key >= provided key. pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> { // Handle range scan when key > end_key - let user_key = match &self.key_range.1 { - Included(end_key) => { + let seek_key = match &self.key_range.1 { + Included(end_key) | Excluded(end_key) => { let end_key = end_key.as_ref(); if end_key < user_key { end_key @@ -241,11 +248,10 @@ impl> BackwardUserIterator { user_key } } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => user_key, }; let full_key = FullKey { - user_key, + user_key: seek_key, epoch_with_gap: EpochWithGap::new_min_epoch(), }; self.iterator.seek(full_key).await?; @@ -253,7 +259,15 @@ impl> BackwardUserIterator { // Handle multi-version self.reset(); // Handle range scan when key < begin_key - self.next().await + self.next().await?; + if let Excluded(end_key) = &self.key_range.1 + && self.is_valid() + && self.key().user_key == end_key.as_ref() + { + debug_assert_le!(end_key.as_ref(), user_key); + self.next().await?; + } + Ok(()) } /// Indicates whether the iterator can be used. @@ -579,7 +593,10 @@ mod tests { ]; let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; - let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let backward_iters = vec![BackwardSstableIterator::new( + sstable.clone(), + sstable_store.clone(), + )]; let bmi = MergeIterator::new(backward_iters); let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); @@ -632,6 +649,26 @@ mod tests { .await .unwrap(); assert!(!bui.is_valid()); + + let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let bmi = MergeIterator::new(backward_iters); + + let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); + let end_key = Excluded(iterator_test_bytes_user_key_of(7)); + + let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key)); + bui.rewind().await.unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); + // ----- end-range iterate ----- + bui.seek(iterator_test_user_key_of(7).as_ref()) + .await + .unwrap(); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); + bui.seek(iterator_test_user_key_of(5).as_ref()) + .await + .unwrap(); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); } // ..=right @@ -657,11 +694,17 @@ mod tests { ]; let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; - let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let backward_iters = vec![BackwardSstableIterator::new( + sstable.clone(), + sstable_store.clone(), + )]; let bmi = MergeIterator::new(backward_iters); let end_key = Included(iterator_test_bytes_user_key_of(7)); - let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, end_key)); + let mut bui = BackwardUserIterator::for_test( + bmi, + (Included(iterator_test_bytes_user_key_of(2)), end_key), + ); // ----- basic iterate ----- bui.rewind().await.unwrap(); @@ -671,8 +714,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- end-range iterate ----- @@ -685,8 +726,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- in-range iterate ----- @@ -699,8 +738,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- begin-range iterate ----- @@ -708,6 +745,27 @@ mod tests { .await .unwrap(); assert!(!bui.is_valid()); + + let end_key = Excluded(iterator_test_bytes_user_key_of(6)); + let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let bmi = MergeIterator::new(backward_iters); + let mut bui = BackwardUserIterator::for_test( + bmi, + (Excluded(iterator_test_bytes_user_key_of(2)), end_key), + ); + // ----- basic iterate ----- + bui.seek(iterator_test_user_key_of(6).as_ref()) + .await + .unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + bui.next().await.unwrap(); + assert!(!bui.is_valid()); + bui.seek(iterator_test_user_key_of(7).as_ref()) + .await + .unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); } // left.. diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 32456eaa27ca5..768aa01af686c 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -14,6 +14,7 @@ use std::ops::Bound::*; +use more_asserts::debug_assert_ge; use risingwave_common::must_match; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange}; @@ -124,20 +125,26 @@ impl> UserIterator { // Handle range scan match &self.key_range.0 { - Included(begin_key) => { + Included(begin_key) | Excluded(begin_key) => { let full_key = FullKey { - user_key: begin_key.clone(), + user_key: begin_key.as_ref(), epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; - self.iterator.seek(full_key.to_ref()).await?; + self.iterator.seek(full_key).await?; } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => { self.iterator.rewind().await?; } }; - self.try_advance_to_next_valid().await + self.try_advance_to_next_valid().await?; + if let Excluded(begin_key) = &self.key_range.0 + && self.is_valid() + && self.key().user_key == begin_key.as_ref() + { + self.next().await?; + } + Ok(()) } /// Resets the iterating position to the first position where the key >= provided key. @@ -147,8 +154,8 @@ impl> UserIterator { self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan when key < begin_key - let user_key = match &self.key_range.0 { - Included(begin_key) => { + let seek_key = match &self.key_range.0 { + Included(begin_key) | Excluded(begin_key) => { let begin_key = begin_key.as_ref(); if begin_key > user_key { begin_key @@ -156,17 +163,24 @@ impl> UserIterator { user_key } } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => user_key, }; let full_key = FullKey { - user_key, + user_key: seek_key, epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; self.iterator.seek(full_key).await?; - self.try_advance_to_next_valid().await + self.try_advance_to_next_valid().await?; + if let Excluded(begin_key) = &self.key_range.0 + && self.is_valid() + && self.key().user_key == begin_key.as_ref() + { + debug_assert_ge!(begin_key.as_ref(), user_key); + self.next().await?; + } + Ok(()) } /// Indicates whether the iterator can be used. @@ -556,7 +570,11 @@ mod tests { let table = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); - let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; + let iters = vec![SstableIterator::create( + table.clone(), + sstable_store.clone(), + read_options.clone(), + )]; let mi = MergeIterator::new(iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); @@ -609,6 +627,35 @@ mod tests { .await .unwrap(); assert!(!ui.is_valid()); + + let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; + let mi = MergeIterator::new(iters); + + let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); + let end_key = Excluded(iterator_test_bytes_user_key_of(7)); + + let mut ui = UserIterator::for_test(mi, (begin_key, end_key)); + // ----- after-end-range iterate ----- + ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(3).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(4).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); } // ..=right diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 2b67865d1c8a3..89f2b61fd3449 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -24,7 +24,7 @@ await-tree = { workspace = true } bytes = "1" cfg-if = "1" delta_btree_map = { path = "../utils/delta_btree_map" } -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" fail = "0.5" diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 43a2d65cbfb11..12f9d431c7c22 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -14,13 +14,16 @@ use either::Either; use multimap::MultiMap; -use risingwave_common::array::Op; +use risingwave_common::array::{ArrayRef, DataChunk, Op}; use risingwave_common::bail; use risingwave_common::row::RowExt; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::{LogReport, NonStrictExpression}; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, EvalErrorReport, NonStrictExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; +use risingwave_expr::ExprError; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::executor::prelude::*; @@ -226,17 +229,13 @@ impl Inner { for expr_idx in expr_indices { let expr_idx = *expr_idx; let derived_watermark = match &self.select_list[expr_idx] { - ProjectSetSelectItem::Expr(expr) => { + ProjectSetSelectItem::Scalar(expr) => { watermark .clone() - .transform_with_expr( - // TODO: should we build `expr` in non-strict mode? - &NonStrictExpression::new_topmost(expr, LogReport), - expr_idx + PROJ_ROW_ID_OFFSET, - ) + .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET) .await } - ProjectSetSelectItem::TableFunction(_) => { + ProjectSetSelectItem::Set(_) => { bail!("Watermark should not be produced by a table function"); } }; @@ -253,3 +252,64 @@ impl Inner { Ok(ret) } } + +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`]. +/// +/// A similar enum is defined in the `batch` module. The difference is that +/// we use `NonStrictExpression` instead of `BoxedExpression` here. +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(NonStrictExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: NonStrictExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost( + prost: &PbProjectSetSelectItem, + error_report: impl EvalErrorReport + 'static, + chunk_size: usize, + ) -> Result { + match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => { + expr::build_non_strict_from_prost(expr, error_report).map(Self::Scalar) + } + PbSelectItem::TableFunction(tf) => { + table_function::build_from_prost(tf, chunk_size).map(Self::Set) + } + } + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>, ExprError> { + match self { + Self::Scalar(expr) => Ok(Either::Right(expr.eval_infallible(input).await)), + // FIXME(runji): table function should also be evaluated non strictly + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + } + } +} diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index c2338394b33ef..155cc20a203a2 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -14,11 +14,10 @@ use multimap::MultiMap; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; use risingwave_pb::stream_plan::ProjectSetNode; use super::*; -use crate::executor::ProjectSetExecutor; +use crate::executor::{ProjectSetExecutor, ProjectSetSelectItem}; pub struct ProjectSetExecutorBuilder; @@ -35,7 +34,11 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder { .get_select_list() .iter() .map(|proto| { - ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size) + ProjectSetSelectItem::from_prost( + proto, + params.eval_error_report.clone(), + params.env.config().developer.chunk_size, + ) }) .try_collect()?; let watermark_derivations = MultiMap::from_iter( diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index 5cff03284f4b6..3f54056b9487c 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -30,8 +30,8 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { let (tx, source) = MockSource::channel(); let source = source.into_executor(schema, PkIndices::new()); - let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)").into_inner(); - let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)").into_inner(); + let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); + let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)"); let tf1 = repeat(build_from_pretty("1:int4").into_inner(), 1); let tf2 = repeat(build_from_pretty("2:int4").into_inner(), 2); diff --git a/src/utils/delta_btree_map/Cargo.toml b/src/utils/delta_btree_map/Cargo.toml index 48534ffad1867..274a028489395 100644 --- a/src/utils/delta_btree_map/Cargo.toml +++ b/src/utils/delta_btree_map/Cargo.toml @@ -15,7 +15,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -educe = "0.5" +educe = "0.6" enum-as-inner = "0.6" [lints]