diff --git a/Cargo.lock b/Cargo.lock index 10183bf485b6e..d7812e54575b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,6 +1303,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-dynamodb" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23c4ed3708df2778c0c49b16e8235e52eb8f2133ae6752c40eea1376e2563fec" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "http 0.2.9", + "regex", + "tracing", +] + [[package]] name = "aws-sdk-kinesis" version = "1.3.0" @@ -2021,22 +2044,22 @@ dependencies = [ [[package]] name = "borsh" -version = "1.2.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf617fabf5cdbdc92f774bfe5062d870f228b80056d41180797abf48bed4056e" +checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" dependencies = [ "borsh-derive", - "cfg_aliases 0.1.1", + "cfg_aliases", ] [[package]] name = "borsh-derive" -version = "1.2.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f404657a7ea7b5249e36808dff544bc88a28f26e0ac40009f674b7a009d14be3" +checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.57", @@ -2363,12 +2386,6 @@ dependencies = [ "syn 2.0.57", ] -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "cfg_aliases" version = "0.2.1" @@ -3678,7 +3695,7 @@ dependencies = [ name = "delta_btree_map" version = "1.9.0-alpha" dependencies = [ - "educe 0.5.11", + "educe 0.6.0", "enum-as-inner 0.6.0", ] @@ -4490,9 +4507,9 @@ dependencies = [ [[package]] name = "educe" -version = "0.5.11" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" dependencies = [ "enum-ordinalize 4.3.0", "proc-macro2", @@ -5163,9 +5180,9 @@ dependencies = [ [[package]] name = "foyer-storage" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa39f2ef3dae07aa552d859def785036865222de1c8d3867bb8607e0c3bf8192" +checksum = "412bc36c780928f233665ba1863bbe81f48362b736ec4ab6112af5e8defa2be3" dependencies = [ "ahash 0.8.11", "allocator-api2", @@ -5181,6 +5198,7 @@ dependencies = [ "futures", "itertools 0.13.0", "lazy_static", + "libc", "lz4", "madsim-tokio", "parking_lot 0.12.1", @@ -7717,7 +7735,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.5.0", "cfg-if", - "cfg_aliases 0.2.1", + "cfg_aliases", "libc", ] @@ -9117,15 +9135,6 @@ dependencies = [ "toml_edit 0.19.15", ] -[[package]] -name = "proc-macro-crate" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8366a6159044a37876a2b9817124296703c586a5c92e2c53751fa06d8d43e8" -dependencies = [ - "toml_edit 0.20.2", -] - [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -10382,7 +10391,7 @@ dependencies = [ "criterion", "darwin-libproc", "easy-ext", - "educe 0.5.11", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "enumflags2", @@ -10470,7 +10479,7 @@ name = "risingwave_common_estimate_size" version = "1.9.0-alpha" dependencies = [ "bytes", - "educe 0.5.11", + "educe 0.6.0", "ethnum", "fixedbitset 0.5.0", "jsonbb", @@ -10676,6 +10685,7 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-msk-iam-sasl-signer", + "aws-sdk-dynamodb", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", @@ -10895,7 +10905,7 @@ dependencies = [ "const-currying", "downcast-rs", "easy-ext", - "educe 0.5.11", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -10940,7 +10950,7 @@ dependencies = [ "chrono", "chrono-tz 0.9.0", "criterion", - "educe 0.5.11", + "educe 0.6.0", "expect-test", "fancy-regex", "futures-async-stream", @@ -11007,7 +11017,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "easy-ext", - "educe 0.5.11", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11756,7 +11766,7 @@ dependencies = [ "cfg-if", "criterion", "delta_btree_map", - "educe 0.5.11", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -14741,17 +14751,6 @@ dependencies = [ "winnow 0.5.15", ] -[[package]] -name = "toml_edit" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" -dependencies = [ - "indexmap 2.2.6", - "toml_datetime", - "winnow 0.5.15", -] - [[package]] name = "toml_edit" version = "0.21.1" diff --git a/Makefile.toml b/Makefile.toml index 8f3c3e48998ad..a154f0ba60dba 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1,120 +1,121 @@ extend = [ - { path = "src/risedevtool/grafana.toml" }, - { path = "src/risedevtool/prometheus.toml" }, - { path = "src/risedevtool/minio.toml" }, - { path = "src/risedevtool/etcd.toml" }, - { path = "src/risedevtool/tempo.toml" }, - { path = "src/risedevtool/gcloud-pubsub.toml" }, - { path = "src/risedevtool/redis.toml" }, - { path = "src/risedevtool/connector.toml" }, - { path = "src/risedevtool/risedev-components.toml" }, - { path = "src/sqlparser/sqlparser_test.toml" }, - { path = "src/frontend/planner_test/planner_test.toml" }, - { path = "src/tests/compaction_test/Makefile.toml" }, - { path = "src/storage/backup/integration_tests/Makefile.toml" }, - { path = "src/java_binding/make-java-binding.toml" }, - { path = "src/stream/tests/integration_tests/integration_test.toml" }, - { path = "e2e_test/source_inline/commands.toml" }, + { path = "src/risedevtool/grafana.toml" }, + { path = "src/risedevtool/prometheus.toml" }, + { path = "src/risedevtool/minio.toml" }, + { path = "src/risedevtool/etcd.toml" }, + { path = "src/risedevtool/tempo.toml" }, + { path = "src/risedevtool/gcloud-pubsub.toml" }, + { path = "src/risedevtool/redis.toml" }, + { path = "src/risedevtool/connector.toml" }, + { path = "src/risedevtool/risedev-components.toml" }, + { path = "src/sqlparser/sqlparser_test.toml" }, + { path = "src/frontend/planner_test/planner_test.toml" }, + { path = "src/tests/compaction_test/Makefile.toml" }, + { path = "src/storage/backup/integration_tests/Makefile.toml" }, + { path = "src/java_binding/make-java-binding.toml" }, + { path = "src/stream/tests/integration_tests/integration_test.toml" }, + { path = "e2e_test/source_inline/commands.toml" }, ] env_files = ["./risedev-components.user.env"] env_scripts = [ - ''' -#!@duckscript + ''' + #!@duckscript + + # only duckscript can modify env variables in cargo-make + # duckscript doc: https://github.com/sagiegurari/duckscript/blob/master/docs/sdk.md + + set_env ENABLE_TELEMETRY "false" + set_env RW_TELEMETRY_TYPE "test" + + is_sanitizer_enabled = get_env ENABLE_SANITIZER + is_hdfs_backend = get_env ENABLE_HDFS + is_release = get_env ENABLE_RELEASE_PROFILE + is_not_release = not ${is_release} + is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING + is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE + is_external_udf_enabled = get_env ENABLE_EXTERNAL_UDF + is_wasm_udf_enabled = get_env ENABLE_WASM_UDF + is_js_udf_enabled = get_env ENABLE_JS_UDF + is_deno_udf_enabled = get_env ENABLE_DENO_UDF + is_python_udf_enabled = get_env ENABLE_PYTHON_UDF + + if ${is_sanitizer_enabled} + set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings -Zbuild-std --target ${CARGO_MAKE_RUST_TARGET_TRIPLE}" + set_env RISEDEV_BUILD_TARGET_DIR "${CARGO_MAKE_RUST_TARGET_TRIPLE}/" + set_env RISEDEV_RUSTFLAGS "-Ctarget-cpu=native --cfg tokio_unstable -Zsanitizer=thread" + else + set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings" + set_env RISEDEV_BUILD_TARGET_DIR "" + end -# only duckscript can modify env variables in cargo-make -# duckscript doc: https://github.com/sagiegurari/duckscript/blob/master/docs/sdk.md - -set_env ENABLE_TELEMETRY "false" - -is_sanitizer_enabled = get_env ENABLE_SANITIZER -is_hdfs_backend = get_env ENABLE_HDFS -is_release = get_env ENABLE_RELEASE_PROFILE -is_not_release = not ${is_release} -is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING -is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE -is_external_udf_enabled = get_env ENABLE_EXTERNAL_UDF -is_wasm_udf_enabled = get_env ENABLE_WASM_UDF -is_js_udf_enabled = get_env ENABLE_JS_UDF -is_deno_udf_enabled = get_env ENABLE_DENO_UDF -is_python_udf_enabled = get_env ENABLE_PYTHON_UDF - -if ${is_sanitizer_enabled} - set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings -Zbuild-std --target ${CARGO_MAKE_RUST_TARGET_TRIPLE}" - set_env RISEDEV_BUILD_TARGET_DIR "${CARGO_MAKE_RUST_TARGET_TRIPLE}/" - set_env RISEDEV_RUSTFLAGS "-Ctarget-cpu=native --cfg tokio_unstable -Zsanitizer=thread" -else - set_env RISEDEV_CARGO_BUILD_EXTRA_ARGS "--timings" - set_env RISEDEV_BUILD_TARGET_DIR "" -end + if ${is_hdfs_backend} + set_env BUILD_HDFS_BACKEND_CMD "-p risingwave_object_store --features hdfs-backend" + else + set_env BUILD_HDFS_BACKEND_CMD "" + end -if ${is_hdfs_backend} - set_env BUILD_HDFS_BACKEND_CMD "-p risingwave_object_store --features hdfs-backend" -else - set_env BUILD_HDFS_BACKEND_CMD "" -end + if ${is_not_release} and ${is_dynamic_linking} + set_env RISINGWAVE_FEATURE_FLAGS "--features rw-dynamic-link --no-default-features" + else + set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link" + end -if ${is_not_release} and ${is_dynamic_linking} - set_env RISINGWAVE_FEATURE_FLAGS "--features rw-dynamic-link --no-default-features" -else - set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link" -end - -if ${is_external_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features external-udf" -end - -if ${is_wasm_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features wasm-udf" -end - -if ${is_js_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features js-udf" -end - -if ${is_deno_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features deno-udf" -end - -if ${is_python_udf_enabled} - flags = get_env RISINGWAVE_FEATURE_FLAGS - set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features python-udf" -end - -if ${is_hummock_trace} - set_env BUILD_HUMMOCK_TRACE_CMD "-p risingwave_storage --features hm-trace" -else - set_env BUILD_HUMMOCK_TRACE_CMD "" -end + if ${is_external_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features external-udf" + end -is_ci = get_env RISINGWAVE_CI -is_not_ci = not ${is_ci} + if ${is_wasm_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features wasm-udf" + end -if ${is_not_ci} - query_log_path = get_env RW_QUERY_LOG_PATH - no_query_log_path = not ${query_log_path} + if ${is_js_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features js-udf" + end - if ${no_query_log_path} - set_env RW_QUERY_LOG_PATH "${PREFIX_LOG}" - fi + if ${is_deno_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features deno-udf" + end - rust_log = get_env RUST_LOG - no_rust_log = not ${rust_log} + if ${is_python_udf_enabled} + flags = get_env RISINGWAVE_FEATURE_FLAGS + set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features python-udf" + end - if ${no_rust_log} - set_env RUST_LOG "pgwire_query_log=info" + if ${is_hummock_trace} + set_env BUILD_HUMMOCK_TRACE_CMD "-p risingwave_storage --features hm-trace" else - set_env RUST_LOG "pgwire_query_log=info,${rust_log}" + set_env BUILD_HUMMOCK_TRACE_CMD "" + end + + is_ci = get_env RISINGWAVE_CI + is_not_ci = not ${is_ci} + + if ${is_not_ci} + query_log_path = get_env RW_QUERY_LOG_PATH + no_query_log_path = not ${query_log_path} + + if ${no_query_log_path} + set_env RW_QUERY_LOG_PATH "${PREFIX_LOG}" + fi + + rust_log = get_env RUST_LOG + no_rust_log = not ${rust_log} + + if ${no_rust_log} + set_env RUST_LOG "pgwire_query_log=info" + else + set_env RUST_LOG "pgwire_query_log=info,${rust_log}" + end end -end -set_env TMUX "tmux -L risedev" -''', + set_env TMUX "tmux -L risedev" + ''', ] @@ -322,7 +323,7 @@ description = "Codesign playground binary to support coredump" # codesign the binary before running. # https://developer.apple.com/forums/thread/694233?answerId=695943022#695943022 condition = { env_set = [ - "ENABLE_COREDUMP", + "ENABLE_COREDUMP", ], env = { "SYSTEM" = "darwin-arm64" } } script = ''' #!/usr/bin/env bash @@ -339,7 +340,7 @@ description = "Codesign all binaries to support coredump" # codesign the binary before running. # https://developer.apple.com/forums/thread/694233?answerId=695943022#695943022 condition = { env_set = [ - "ENABLE_COREDUMP", + "ENABLE_COREDUMP", ], env = { "SYSTEM" = "darwin-arm64" } } script = ''' #!/usr/bin/env bash @@ -374,9 +375,9 @@ category = "RiseDev - Build" description = "Copy RisngWave binaries to bin" condition = { env_set = ["ENABLE_BUILD_RUST"] } dependencies = [ - "link-all-in-one-binaries", - "link-user-bin", - "codesign-binaries", + "link-all-in-one-binaries", + "link-user-bin", + "codesign-binaries", ] [tasks.b] @@ -493,15 +494,15 @@ private = true category = "Misc" description = "Download all available components at once" dependencies = [ - "download-maven", - "download-etcd", - "download-grafana", - "download-tempo", - "download-mcli", - "download-minio", - "download-prometheus", - "download-pubsub", - "download-redis", + "download-maven", + "download-etcd", + "download-grafana", + "download-tempo", + "download-mcli", + "download-minio", + "download-prometheus", + "download-pubsub", + "download-redis", ] [tasks.create-user-profiles-file] @@ -509,7 +510,7 @@ private = true category = "RiseDev - Prepare" description = "Create user profiles file if not exists" condition = { files_not_exist = [ - "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/risedev-profiles.user.yml", + "${CARGO_MAKE_WORKSPACE_WORKING_DIRECTORY}/risedev-profiles.user.yml", ] } script = ''' #!/usr/bin/env bash @@ -536,34 +537,34 @@ EOF category = "RiseDev - Prepare" description = "Prepare dev cluster by downloading necessary tools and build required components" dependencies = [ - "create-user-profiles-file", - "download-all", - "build-risingwave", - "build-connector-node", - "post-build-risingwave", - "prepare-config", + "create-user-profiles-file", + "download-all", + "build-risingwave", + "build-connector-node", + "post-build-risingwave", + "prepare-config", ] [tasks.pre-start-benchmark] category = "RiseDev - Prepare" description = "Download necessary tools to deploy a benchmark env" dependencies = [ - "download-minio", - "download-mcli", - "download-etcd", - "download-grafana", - "download-prometheus", - "download-tempo", - "download-redis", + "download-minio", + "download-mcli", + "download-etcd", + "download-grafana", + "download-prometheus", + "download-tempo", + "download-redis", ] [tasks.pre-start-playground] category = "RiseDev - Prepare" description = "Preparation steps for playground" dependencies = [ - "build-risingwave-playground", - "codesign-playground", - "build-connector-node", + "build-risingwave-playground", + "codesign-playground", + "build-connector-node", ] [tasks.check-risedev-env-file] @@ -740,16 +741,16 @@ dependencies = ["k", "clean-data"] private = true category = "RiseDev - Check" install_crate = { min_version = "0.9.51", crate_name = "cargo-nextest", binary = "cargo", test_arg = [ - "nextest", - "--help", + "nextest", + "--help", ], install_command = "binstall" } [tasks.install-llvm-cov] private = true category = "RiseDev - Check" install_crate = { min_version = "0.5.17", crate_name = "cargo-llvm-cov", binary = "cargo", test_arg = [ - "llvm-cov", - "--help", + "llvm-cov", + "--help", ], install_command = "binstall" } [tasks.install-tools] @@ -830,11 +831,11 @@ description = "🌟 Run unit tests" category = "RiseDev - Build" dependencies = ["prepare"] condition = { env_set = [ - "ENABLE_BUILD_RW_CONNECTOR", + "ENABLE_BUILD_RW_CONNECTOR", ], files_modified = { input = [ - "./java/connector-node/**/*", + "./java/connector-node/**/*", ], output = [ - "./java/connector-node/assembly/target/**/*", + "./java/connector-node/assembly/target/**/*", ] } } description = "Build RisingWave Connector from source" script = ''' @@ -1054,8 +1055,8 @@ private = true category = "RiseDev - Check" description = "Run cargo hakari check and attempt to fix" install_crate = { min_version = "0.9.24", crate_name = "cargo-hakari", binary = "cargo", test_arg = [ - "hakari", - "--help", + "hakari", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1072,8 +1073,8 @@ private = true category = "RiseDev - Check" description = "Run cargo sort check and attempt to fix" install_crate = { min_version = "1.0.9", crate_name = "cargo-sort", binary = "cargo", test_arg = [ - "sort", - "--help", + "sort", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1135,7 +1136,7 @@ private = true category = "RiseDev - Check" description = "Run cargo typos-cli check" install_crate = { min_version = "1.20.4", crate_name = "typos-cli", binary = "typos", test_arg = [ - "--help", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1151,8 +1152,8 @@ category = "RiseDev - Check" description = "Check unused dependencies" env = { RUSTFLAGS = "--cfg tokio_unstable" } install_crate = { min_version = "0.1.35", crate_name = "cargo-udeps", binary = "cargo", test_arg = [ - "udeps", - "--help", + "udeps", + "--help", ], install_command = "binstall" } script = """ #!/usr/bin/env bash @@ -1179,13 +1180,13 @@ scripts/check/check-trailing-spaces.sh --fix [tasks.check-fast] category = "RiseDev - Check" dependencies = [ - "warn-on-missing-tools", - # Disable hakari until we make sure it's useful - # "check-hakari", - "check-dep-sort", - "check-fmt", - "check-trailing-spaces", - "check-typos", + "warn-on-missing-tools", + # Disable hakari until we make sure it's useful + # "check-hakari", + "check-dep-sort", + "check-fmt", + "check-trailing-spaces", + "check-typos", ] description = "Perform part of the pre-CI checks that are fast to run" @@ -1206,10 +1207,10 @@ alias = "check" [tasks.check-fix] category = "RiseDev - Check" dependencies = [ - "warn-on-missing-tools", - "check-fast", - "check-clippy-fix", - "check-java-fix", + "warn-on-missing-tools", + "check-fast", + "check-clippy-fix", + "check-java-fix", ] script = """ #!/usr/bin/env bash @@ -1294,7 +1295,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ - "--help", + "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] command = "sqllogictest" diff --git a/ci/risedev-components.ci.benchmark.env b/ci/risedev-components.ci.benchmark.env index 7761d6ce969d0..5bb6c4abddb56 100644 --- a/ci/risedev-components.ci.benchmark.env +++ b/ci/risedev-components.ci.benchmark.env @@ -4,3 +4,4 @@ ENABLE_ETCD=true ENABLE_BUILD_RUST=true ENABLE_RELEASE_PROFILE=true ENABLE_PROMETHEUS_GRAFANA=true +RW_TELEMETRY_TYPE=test diff --git a/ci/risedev-components.ci.env b/ci/risedev-components.ci.env index 7157083eb871a..0430207739145 100644 --- a/ci/risedev-components.ci.env +++ b/ci/risedev-components.ci.env @@ -1,3 +1,4 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true +RW_TELEMETRY_TYPE=test diff --git a/ci/risedev-components.ci.source.env b/ci/risedev-components.ci.source.env index 22755d4c6f3d8..88d76b3df8e98 100644 --- a/ci/risedev-components.ci.source.env +++ b/ci/risedev-components.ci.source.env @@ -2,3 +2,4 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true ENABLE_PUBSUB=true +RW_TELEMETRY_TYPE=test diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index f1177ecf8320c..3b31afeef253d 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -14,6 +14,7 @@ export MINIO_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/minio export MCLI_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/mc export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud-cli-475.0.0-linux-x86_64.tar.gz export NEXTEST_HIDE_PROGRESS_BAR=true +export RW_TELEMETRY_TYPE=test unset LANG if [ -n "${BUILDKITE_COMMIT:-}" ]; then export GIT_SHA=$BUILDKITE_COMMIT diff --git a/e2e_test/batch/catalog/pg_class.slt.part b/e2e_test/batch/catalog/pg_class.slt.part index ffb53e32d66b5..a6fe4a1257122 100644 --- a/e2e_test/batch/catalog/pg_class.slt.part +++ b/e2e_test/batch/catalog/pg_class.slt.part @@ -20,4 +20,4 @@ SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit query ITIT SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass; ---- -2147478671 pg_namespace 1 v +2147478670 pg_namespace 1 v diff --git a/e2e_test/error_ui/simple/recovery.slt b/e2e_test/error_ui/simple/recovery.slt index b47afc94c47ad..152854c278483 100644 --- a/e2e_test/error_ui/simple/recovery.slt +++ b/e2e_test/error_ui/simple/recovery.slt @@ -1,15 +1,15 @@ -# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/12474. +# TODO: the test triggers a recovery caused by a known issue: https://github.com/risingwavelabs/risingwave/issues/11915. # We should consider using a mechanism designed for testing recovery instead of depending on a bug. statement ok -create table t (v int); +create table t (v decimal); statement ok -create materialized view mv as select generate_series(1, 10), coalesce(pg_sleep(2), v) / 0 bomb from t; +create materialized view mv as select sum(coalesce(pg_sleep(1), v)) from t; -# The bomb will be triggered after 2 seconds of sleep, so the insertion should return successfully. +# The bomb will be triggered after 1 seconds of sleep, so the insertion should return successfully. statement ok -insert into t values (1); +insert into t values (4e28), (4e28); # Wait for recovery to complete. sleep 15s @@ -25,7 +25,7 @@ with error as ( limit 1 ) select -case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Division by zero%' then 'ok' +case when error like '%Actor % exited unexpectedly: Executor error: Chunk operation error: Numeric out of range%' then 'ok' else error end as result from error; diff --git a/e2e_test/streaming/bug_fixes/issue_12474.slt b/e2e_test/streaming/bug_fixes/issue_12474.slt new file mode 100644 index 0000000000000..245ca499a255d --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12474.slt @@ -0,0 +1,26 @@ +# https://github.com/risingwavelabs/risingwave/issues/12474 + +statement ok +create table t(x int[]); + +statement ok +create materialized view mv as select 1 / x[1] as bomb, unnest(x) as unnest from t; + +statement ok +insert into t values (array[0, 1]), (array[1]); + +statement ok +flush; + +query II rowsort +select * from mv; +---- +1 1 +NULL 0 +NULL 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index 2d50c1039743c..7784159eb6c70 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -15,13 +15,16 @@ use either::Either; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, BoxedExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::error::{BatchError, Result}; use crate::executor::{ @@ -170,6 +173,57 @@ impl BoxedExecutorBuilder for ProjectSetExecutor { } } +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`] +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(BoxedExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: BoxedExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { + Ok(match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?), + PbSelectItem::TableFunction(tf) => { + Self::Set(table_function::build_from_prost(tf, chunk_size)?) + } + }) + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>> { + match self { + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)), + } + } +} + #[cfg(test)] mod tests { use futures::stream::StreamExt; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index e2af368c24a85..e5b4a2feaa2a0 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -40,7 +40,7 @@ clap = { workspace = true } comfy-table = "7" crc32fast = "1" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" enumflags2 = { version = "0.7.8" } diff --git a/src/common/estimate_size/Cargo.toml b/src/common/estimate_size/Cargo.toml index 24cf19b3809e9..77e4203f9c7cb 100644 --- a/src/common/estimate_size/Cargo.toml +++ b/src/common/estimate_size/Cargo.toml @@ -16,7 +16,7 @@ normal = ["workspace-hack"] [dependencies] bytes = "1" -educe = "0.5" +educe = "0.6" ethnum = { version = "1", features = ["serde"] } fixedbitset = "0.5" jsonbb = { workspace = true } diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 54a88789a171f..0fbd526692da0 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -16,6 +16,7 @@ pub mod manager; pub mod pb_compatible; pub mod report; +use std::env; use std::time::SystemTime; use serde::{Deserialize, Serialize}; @@ -25,6 +26,11 @@ use thiserror_ext::AsReport; use crate::util::env_var::env_var_is_true_or; use crate::util::resource_util::cpu::total_cpu_available; use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes}; +use crate::RW_VERSION; + +pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE"; +const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud +const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test"; // test environment, eg. CI & Risedev /// Url of telemetry backend pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; @@ -159,6 +165,34 @@ pub fn current_timestamp() -> u64 { .as_secs() } +pub fn report_scarf_enabled() -> bool { + env::var(TELEMETRY_CLUSTER_TYPE) + .map(|deploy_type| { + !(deploy_type.eq_ignore_ascii_case(TELEMETRY_CLUSTER_TYPE_HOSTED) + || deploy_type.eq_ignore_ascii_case(TELEMETRY_CLUSTER_TYPE_TEST)) + }) + .unwrap_or(true) +} + +// impl logic to report to Scarf service, containing RW version and deployment platform +pub async fn report_to_scarf() { + let request_url = format!( + "https://risingwave.gateway.scarf.sh/telemetry/{}/{}", + RW_VERSION, + System::name().unwrap_or_default() + ); + // keep trying every 1h until success + loop { + let res = reqwest::get(&request_url).await; + if let Ok(res) = res { + if res.status().is_success() { + break; + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 876247560482d..0b30dc50510ad 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,6 +34,7 @@ await-tree = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } aws-msk-iam-sasl-signer = "1.0.0" +aws-sdk-dynamodb = "1" aws-sdk-kinesis = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs new file mode 100644 index 0000000000000..edf2e7c08cc9f --- /dev/null +++ b/src/connector/src/sink/dynamodb.rs @@ -0,0 +1,401 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::usize; + +use anyhow::{anyhow, Context}; +use aws_sdk_dynamodb as dynamodb; +use aws_sdk_dynamodb::client::Client; +use aws_smithy_types::Blob; +use dynamodb::types::{ + AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, + TableStatus, WriteRequest, +}; +use maplit::hashmap; +use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::Row as _; +use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; +use risingwave_common::util::iter_util::ZipEqDebug; +use serde_derive::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; +use with_options::WithOptions; + +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::connector_common::AwsAuthProps; +use crate::error::ConnectorResult; + +pub const DYNAMO_DB_SINK: &str = "dynamodb"; + +#[serde_as] +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct DynamoDbConfig { + #[serde(rename = "table", alias = "dynamodb.table")] + pub table: String, + + #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_rows: usize, + + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, +} + +fn default_max_batch_rows() -> usize { + 1024 +} + +impl DynamoDbConfig { + pub async fn build_client(&self) -> ConnectorResult { + let config = &self.aws_auth_props; + let aws_config = config.build_config().await?; + + Ok(Client::new(&aws_config)) + } + + fn from_hashmap(values: HashMap) -> Result { + serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e))) + } +} + +#[derive(Clone, Debug)] +pub struct DynamoDbSink { + pub config: DynamoDbConfig, + schema: Schema, + pk_indices: Vec, +} + +impl Sink for DynamoDbSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = DYNAMO_DB_SINK; + + async fn validate(&self) -> Result<()> { + let client = (self.config.build_client().await) + .context("validate DynamoDB sink error") + .map_err(SinkError::DynamoDb)?; + + let table_name = &self.config.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + if !matches!(table.table_status(), Some(TableStatus::Active)) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} is not active", + table_name + ))); + } + let pk_set: HashSet = self + .schema + .fields() + .iter() + .enumerate() + .filter(|(k, _)| self.pk_indices.contains(k)) + .map(|(_, v)| v.name.clone()) + .collect(); + let key_schema = table.key_schema(); + + for key_element in key_schema.iter().map(|x| x.attribute_name()) { + if !pk_set.contains(key_element) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} key field {} not found in schema or not primary key", + table_name, + key_element + ))); + } + } + + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok( + DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone()) + .await? + .into_log_sinker(usize::MAX), + ) + } +} + +impl TryFrom for DynamoDbSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = DynamoDbConfig::from_hashmap(param.properties)?; + + Ok(Self { + config, + schema, + pk_indices: param.downstream_pk, + }) + } +} + +#[derive(Debug)] +struct DynamoDbRequest { + inner: WriteRequest, + key_items: Vec, +} + +impl DynamoDbRequest { + fn extract_pk_values(&self) -> Option> { + let key = match (&self.inner.put_request(), &self.inner.delete_request()) { + (Some(put_req), None) => &put_req.item, + (None, Some(del_req)) => &del_req.key, + _ => return None, + }; + let vs = key + .iter() + .filter(|(k, _)| self.key_items.contains(k)) + .map(|(_, v)| v.clone()) + .collect(); + Some(vs) + } +} + +struct DynamoDbPayloadWriter { + request_items: Vec, + client: Client, + table: String, + dynamodb_keys: Vec, +} + +impl DynamoDbPayloadWriter { + fn write_one_insert(&mut self, item: HashMap) { + let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); + let req = WriteRequest::builder().put_request(put_req).build(); + self.write_one_req(req); + } + + fn write_one_delete(&mut self, key: HashMap) { + let key = key + .into_iter() + .filter(|(k, _)| self.dynamodb_keys.contains(k)) + .collect(); + let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); + let req = WriteRequest::builder().delete_request(del_req).build(); + self.write_one_req(req); + } + + fn write_one_req(&mut self, req: WriteRequest) { + let r_req = DynamoDbRequest { + inner: req, + key_items: self.dynamodb_keys.clone(), + }; + if let Some(v) = r_req.extract_pk_values() { + self.request_items.retain(|item| { + !item + .extract_pk_values() + .unwrap_or_default() + .iter() + .all(|x| v.contains(x)) + }); + } + self.request_items.push(r_req); + } + + async fn write_chunk(&mut self) -> Result<()> { + if !self.request_items.is_empty() { + let table = self.table.clone(); + let req_items = std::mem::take(&mut self.request_items) + .into_iter() + .map(|r| r.inner) + .collect(); + let reqs = hashmap! { + table => req_items, + }; + self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send() + .await + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + })?; + } + + Ok(()) + } +} + +pub struct DynamoDbSinkWriter { + max_batch_rows: usize, + payload_writer: DynamoDbPayloadWriter, + formatter: DynamoDbFormatter, +} + +impl DynamoDbSinkWriter { + pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { + let client = config.build_client().await?; + let table_name = &config.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + let dynamodb_keys = table + .key_schema + .unwrap_or_default() + .into_iter() + .map(|k| k.attribute_name) + .collect(); + + let payload_writer = DynamoDbPayloadWriter { + request_items: Vec::new(), + client, + table: config.table, + dynamodb_keys, + }; + + Ok(Self { + max_batch_rows: config.max_batch_rows, + payload_writer, + formatter: DynamoDbFormatter { schema }, + }) + } + + async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + let items = self.formatter.format_row(row)?; + match op { + Op::Insert | Op::UpdateInsert => { + self.payload_writer.write_one_insert(items); + } + Op::Delete => { + self.payload_writer.write_one_delete(items); + } + Op::UpdateDelete => {} + } + } + if self.payload_writer.request_items.len() >= self.max_batch_rows { + self.payload_writer.write_chunk().await?; + } + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { + self.payload_writer.write_chunk().await + } +} + +impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + self.write_chunk_inner(chunk).await + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + self.flush().await?; + } + Ok(()) + } +} + +struct DynamoDbFormatter { + schema: Schema, +} + +impl DynamoDbFormatter { + fn format_row(&self, row: RowRef<'_>) -> Result> { + row.iter() + .zip_eq_debug((self.schema.clone()).into_fields()) + .map(|(scalar, field)| { + map_data_type(scalar, &field.data_type()).map(|attr| (field.name, attr)) + }) + .collect() + } +} + +fn map_data_type( + scalar_ref: Option>, + data_type: &DataType, +) -> Result { + let Some(scalar_ref) = scalar_ref else { + return Ok(AttributeValue::Null(true)); + }; + let attr = match data_type { + DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int256 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal + | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)), + // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699) + DataType::Varchar + | DataType::Interval + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)), + DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()), + DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())), + DataType::List(datatype) => { + let list_attr = scalar_ref + .into_list() + .iter() + .map(|x| map_data_type(x, datatype)) + .collect::>>()?; + AttributeValue::L(list_attr) + } + DataType::Struct(st) => { + let mut map = HashMap::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in + scalar_ref.into_struct().iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) + { + let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; + map.insert(sub_field.name.clone(), attr); + } + AttributeValue::M(map) + } + }; + Ok(attr) +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 8313ea20dafa7..09fe39a4865c8 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -264,8 +264,6 @@ impl KafkaConfig { pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.rdkafka_properties_common.set_client(c); self.rdkafka_properties_producer.set_client(c); - - tracing::info!("kafka client starts with: {:?}", c); } } diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 3b5533a49c76c..6f534c2800a53 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -15,9 +15,11 @@ use std::collections::HashMap; use anyhow::{anyhow, Context}; -use aws_sdk_kinesis::operation::put_record::PutRecordOutput; +use aws_sdk_kinesis::operation::put_records::builders::PutRecordsFluentBuilder; use aws_sdk_kinesis::primitives::Blob; +use aws_sdk_kinesis::types::PutRecordsRequestEntry; use aws_sdk_kinesis::Client as KinesisClient; +use futures::{FutureExt, TryFuture}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::session_config::sink_decouple::SinkDecouple; @@ -70,6 +72,8 @@ impl TryFrom for KinesisSink { } } +const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64; + impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = AsyncTruncateLogSinkerOf; @@ -125,7 +129,7 @@ impl Sink for KinesisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(usize::MAX)) + .into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM)) } } @@ -148,12 +152,13 @@ impl KinesisSinkConfig { pub struct KinesisSinkWriter { pub config: KinesisSinkConfig, formatter: SinkFormatterImpl, - payload_writer: KinesisSinkPayloadWriter, + client: KinesisClient, } struct KinesisSinkPayloadWriter { - client: KinesisClient, - config: KinesisSinkConfig, + // builder should always be `Some`. Making it an option so that we can call + // builder methods that take the builder ownership as input and return with a new builder. + builder: Option, } impl KinesisSinkWriter { @@ -182,29 +187,57 @@ impl KinesisSinkWriter { Ok(Self { config: config.clone(), formatter, - payload_writer: KinesisSinkPayloadWriter { client, config }, + client, }) } + + fn new_payload_writer(&self) -> KinesisSinkPayloadWriter { + let builder = self + .client + .put_records() + .stream_name(&self.config.common.stream_name); + KinesisSinkPayloadWriter { + builder: Some(builder), + } + } } + +pub type KinesisSinkPayloadWriterDeliveryFuture = + impl TryFuture + Unpin + Send + 'static; + impl KinesisSinkPayloadWriter { - async fn put_record(&self, key: &str, payload: Vec) -> Result { - let payload = Blob::new(payload); - // todo: switch to put_records() for batching - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || async { - self.client - .put_record() - .stream_name(&self.config.common.stream_name) + fn put_record(&mut self, key: String, payload: Vec) { + self.builder = Some( + self.builder.take().expect("should not be None").records( + PutRecordsRequestEntry::builder() .partition_key(key) - .data(payload.clone()) - .send() - .await - }, - ) - .await - .with_context(|| format!("failed to put record to {}", self.config.common.stream_name)) - .map_err(SinkError::Kinesis) + .data(Blob::new(payload)) + .build() + .expect("should not fail because we have set `data` and `partition_key`"), + ), + ); + } + + fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { + async move { + let builder = self.builder.expect("should not be None"); + let context_fmt = format!( + "failed to put record to {}", + builder + .get_stream_name() + .as_ref() + .expect("should have set stream name") + ); + Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || builder.clone().send(), + ) + .await + .with_context(|| context_fmt.clone()) + .map_err(SinkError::Kinesis)?; + Ok(()) + } + .boxed() } } @@ -214,24 +247,46 @@ impl FormattedSink for KinesisSinkPayloadWriter { async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { self.put_record( - &k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?, + k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?, v.unwrap_or_default(), - ) - .await - .map(|_| ()) + ); + Ok(()) } } impl AsyncTruncateSinkWriter for KinesisSinkWriter { + type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { + let mut payload_writer = self.new_payload_writer(); dispatch_sink_formatter_str_key_impl!( &self.formatter, formatter, - self.payload_writer.write_chunk(chunk, formatter).await - ) + payload_writer.write_chunk(chunk, formatter).await + )?; + + add_future + .add_future_may_await(payload_writer.finish()) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use aws_sdk_kinesis::types::PutRecordsRequestEntry; + use aws_smithy_types::Blob; + + #[test] + fn test_kinesis_entry_builder_save_unwrap() { + PutRecordsRequestEntry::builder() + .data(Blob::new(b"data")) + .partition_key("partition-key") + .build() + .unwrap(); } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index f93023687d43e..d07f7ed89ea70 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -21,6 +21,7 @@ pub mod decouple_checkpoint_log_sink; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; +pub mod dynamodb; pub mod elasticsearch; pub mod encoder; pub mod formatter; @@ -99,6 +100,7 @@ macro_rules! for_all_sinks { { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, + { DynamoDb, $crate::sink::dynamodb::DynamoDbSink }, { Mongodb, $crate::sink::mongodb::MongodbSink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::trivial::TableSink } @@ -571,6 +573,12 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), + #[error("DynamoDB error: {0}")] + DynamoDb( + #[source] + #[backtrace] + anyhow::Error, + ), #[error(transparent)] Connector( #[from] diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 63fc60ecc4510..ae245d2ef2363 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -666,11 +666,7 @@ impl RemoteCoordinator { .start_sink_coordinator_stream(param.clone()) .await?; - tracing::trace!( - "{:?} RemoteCoordinator started with properties: {:?}", - R::SINK_NAME, - ¶m.properties - ); + tracing::trace!("{:?} RemoteCoordinator started", R::SINK_NAME,); Ok(RemoteCoordinator { stream_handle }) } diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 4c560d30ecce6..a037dd4463ecb 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -163,8 +163,6 @@ impl KafkaProperties { pub fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.rdkafka_properties_common.set_client(c); self.rdkafka_properties_consumer.set_client(c); - - tracing::info!("kafka client starts with: {:?}", c); } } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index eb0ba30d12431..a8c1abb40cd55 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -165,7 +165,7 @@ impl SourceReader { } else { let to_reader_splits = splits.into_iter().map(|split| vec![split]); try_join_all(to_reader_splits.into_iter().map(|splits| { - tracing::debug!(?splits, ?prop, "spawning connector split reader"); + tracing::debug!(?splits, "spawning connector split reader"); let props = prop.clone(); let data_gen_columns = data_gen_columns.clone(); let parser_config = parser_config.clone(); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 9263aa9004173..d0933d1c533cc 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -134,6 +134,60 @@ DorisConfig: - name: r#type field_type: String required: true +DynamoDbConfig: + fields: + - name: table + field_type: String + required: true + alias: + - dynamodb.table + - name: dynamodb.max_batch_rows + field_type: usize + required: false + default: '1024' + - name: aws.region + field_type: String + required: false + alias: + - region + - name: aws.endpoint_url + field_type: String + required: false + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id + field_type: String + required: false + alias: + - access_key + - name: aws.credentials.secret_access_key + field_type: String + required: false + alias: + - secret_key + - name: aws.credentials.session_token + field_type: String + required: false + alias: + - session_token + - name: aws.credentials.role.arn + field_type: String + comments: IAM role + required: false + alias: + - arn + - name: aws.credentials.role.external_id + field_type: String + comments: external ID in IAM role trust policy + required: false + alias: + - external_id + - name: aws.profile + field_type: String + required: false + alias: + - profile GooglePubSubConfig: fields: - name: pubsub.project_id diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 38aacbb303c22..cbff3a5ff2e28 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -30,7 +30,7 @@ chrono = { version = "0.4", default-features = false, features = [ const-currying = "0.0.4" downcast-rs = "1.2" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" futures = "0.3" diff --git a/src/expr/core/src/table_function/mod.rs b/src/expr/core/src/table_function/mod.rs index b87d6f020b093..c14a50a8f41a4 100644 --- a/src/expr/core/src/table_function/mod.rs +++ b/src/expr/core/src/table_function/mod.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use futures_async_stream::try_stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; -use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk}; +use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk}; use risingwave_common::types::{DataType, DatumRef}; -use risingwave_pb::expr::project_set_select_item::SelectItem; use risingwave_pb::expr::table_function::PbType; -use risingwave_pb::expr::{PbProjectSetSelectItem, PbTableFunction}; +use risingwave_pb::expr::PbTableFunction; use super::{ExprError, Result}; use crate::expr::{build_from_prost as expr_build_from_prost, BoxedExpression}; @@ -141,53 +139,6 @@ pub fn build( desc.build_table(return_type, chunk_size, children) } -/// See also [`PbProjectSetSelectItem`] -#[derive(Debug)] -pub enum ProjectSetSelectItem { - TableFunction(BoxedTableFunction), - Expr(BoxedExpression), -} - -impl From for ProjectSetSelectItem { - fn from(table_function: BoxedTableFunction) -> Self { - ProjectSetSelectItem::TableFunction(table_function) - } -} - -impl From for ProjectSetSelectItem { - fn from(expr: BoxedExpression) -> Self { - ProjectSetSelectItem::Expr(expr) - } -} - -impl ProjectSetSelectItem { - pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result { - match prost.select_item.as_ref().unwrap() { - SelectItem::Expr(expr) => expr_build_from_prost(expr).map(Into::into), - SelectItem::TableFunction(tf) => build_from_prost(tf, chunk_size).map(Into::into), - } - } - - pub fn return_type(&self) -> DataType { - match self { - ProjectSetSelectItem::TableFunction(tf) => tf.return_type(), - ProjectSetSelectItem::Expr(expr) => expr.return_type(), - } - } - - pub async fn eval<'a>( - &'a self, - input: &'a DataChunk, - ) -> Result, ArrayRef>> { - match self { - Self::TableFunction(tf) => Ok(Either::Left( - TableFunctionOutputIter::new(tf.eval(input).await).await?, - )), - Self::Expr(expr) => expr.eval(input).await.map(Either::Right), - } - } -} - /// A wrapper over the output of table function that allows iteration by rows. /// /// If the table function returns multiple columns, the output will be struct values. diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 9e94e3395e761..6dfbcc5905750 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -40,7 +40,7 @@ chrono = { version = "0.4", default-features = false, features = [ "std", ] } chrono-tz = { version = "0.9", features = ["case-insensitive"] } -educe = "0.5" +educe = "0.6" fancy-regex = "0.13" futures-async-stream = { workspace = true } futures-util = "0.3" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 4adeaae423ca6..feb4aa596c55f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -29,7 +29,7 @@ clap = { workspace = true } downcast-rs = "1.2" dyn-clone = "1.0.14" easy-ext = "1" -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" fancy-regex = "0.13.0" diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index a1c014161988e..14d6d59885b40 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -73,11 +73,11 @@ └─LogicalProject { exprs: [t.x, t._row_id] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 stream_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 - id: lead with offset argument and empty over clause sql: | create table t(x int); @@ -88,11 +88,11 @@ └─LogicalProject { exprs: [t.x, t._row_id, 2:Int32] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 stream_error: |- - Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Feature is not yet implemented: Window function with empty PARTITION BY is not supported because of potential bad performance. If you really need this, please workaround with something like `PARTITION BY 1::int`. + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/11505 - id: lead with non-const offset argument and empty over clause sql: | create table t(x int); diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs index de1fe4924642f..3d1c8f2a56f74 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs @@ -29,7 +29,6 @@ mod pg_extension; mod pg_index; mod pg_indexes; mod pg_inherits; -mod pg_keywords; mod pg_language; mod pg_locks; mod pg_matviews; diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs deleted file mode 100644 index a859527afa6df..0000000000000 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_keywords.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// The code is same as `expr/impl/src/table_function/pg_get_keywords.rs`. - -use risingwave_common::types::Fields; -use risingwave_frontend_macro::system_catalog; -use risingwave_sqlparser::keywords::{ - ALL_KEYWORDS_INDEX, RESERVED_FOR_COLUMN_ALIAS, RESERVED_FOR_COLUMN_OR_TABLE_NAME, -}; - -use crate::catalog::system_catalog::SysCatalogReaderImpl; - -/// The catalog `pg_keywords` stores keywords. `pg_get_keywords` returns the content of this table. -/// Ref: [`https://www.postgresql.org/docs/15/functions-info.html`] -/// -/// # Example -/// -/// ```slt -/// query TTT -/// select * from pg_keywords where word = 'add'; -/// ---- -/// add U unreserved -/// ``` -#[derive(Fields)] -struct PgKeywords { - #[primary_key] - word: String, - catcode: char, - catdesc: &'static str, -} - -#[system_catalog(table, "pg_catalog.pg_keywords")] -fn read_pg_keywords(_reader: &SysCatalogReaderImpl) -> Vec { - ALL_KEYWORDS_INDEX - .iter() - .map(|keyword| { - // FIXME: The current category is not correct. Many are different from the PostgreSQL. - let catcode = if !RESERVED_FOR_COLUMN_OR_TABLE_NAME.contains(keyword) { - 'U' - } else if !RESERVED_FOR_COLUMN_ALIAS.contains(keyword) { - 'C' - } else { - 'R' - }; - let catdesc = match catcode { - 'U' => "unreserved", - 'C' => "unreserved (cannot be function or type name)", - 'T' => "reserved (can be function or type name)", - 'R' => "reserved", - _ => unreachable!(), - }; - PgKeywords { - word: keyword.to_string().to_lowercase(), - catcode, - catdesc, - } - }) - .collect() -} diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index b5762a224d180..cf130b57bac94 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -604,6 +604,16 @@ impl PredicatePushdown for LogicalOverWindow { } } +macro_rules! empty_partition_by_not_implemented { + () => { + bail_not_implemented!( + issue = 11505, + "Window function with empty PARTITION BY is not supported because of potential bad performance. \ + If you really need this, please workaround with something like `PARTITION BY 1::int`." + ) + }; +} + impl ToBatch for LogicalOverWindow { fn to_batch(&self) -> Result { assert!( @@ -619,7 +629,7 @@ impl ToBatch for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!("Window function with empty PARTITION BY is not supported yet"); + empty_partition_by_not_implemented!(); } let input = self.input().to_batch()?; @@ -670,9 +680,7 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!( - "Window function with empty PARTITION BY is not supported yet" - ); + empty_partition_by_not_implemented!(); } let sort_input = @@ -694,9 +702,7 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - bail_not_implemented!( - "Window function with empty PARTITION BY is not supported yet" - ); + empty_partition_by_not_implemented!(); } let new_input = diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bf5bb72ed731b..43e564271702a 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -25,7 +25,7 @@ use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; -use risingwave_common::telemetry::telemetry_env_enabled; +use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_meta::barrier::StreamRpcManager; @@ -754,6 +754,11 @@ pub async fn start_service_as_election_leader( } else { tracing::info!("Telemetry didn't start due to meta backend or config"); } + if report_scarf_enabled() { + tokio::spawn(report_to_scarf()); + } else { + tracing::info!("Scarf reporting is disabled"); + }; if let Some(pair) = env.event_log_manager_ref().take_join_handle() { sub_tasks.push(pair); diff --git a/src/sqlparser/Cargo.toml b/src/sqlparser/Cargo.toml index 377b2e676d6a8..bf6307e3ace68 100644 --- a/src/sqlparser/Cargo.toml +++ b/src/sqlparser/Cargo.toml @@ -32,7 +32,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tracing = "0.1" tracing-subscriber = "0.3" -winnow = "0.6.9" +winnow = "0.6.11" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 8c383ad54589a..e68fb13e88552 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -30,7 +30,7 @@ use winnow::{PResult, Parser as _}; use crate::ast::*; use crate::keywords::{self, Keyword}; use crate::parser_v2; -use crate::parser_v2::{keyword, literal_i64, literal_uint, ParserExt as _}; +use crate::parser_v2::{keyword, literal_i64, literal_uint, single_quoted_string, ParserExt as _}; use crate::tokenizer::*; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -170,7 +170,7 @@ type ColumnsDefTuple = ( /// Reference: /// -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum Precedence { Zero = 0, LogicalOr, // 5 in upstream @@ -1009,28 +1009,12 @@ impl Parser<'_> { /// Parse a SQL CAST function e.g. `CAST(expr AS FLOAT)` pub fn parse_cast_expr(&mut self) -> PResult { - self.expect_token(&Token::LParen)?; - let expr = self.parse_expr()?; - self.expect_keyword(Keyword::AS)?; - let data_type = self.parse_data_type()?; - self.expect_token(&Token::RParen)?; - Ok(Expr::Cast { - expr: Box::new(expr), - data_type, - }) + parser_v2::expr_cast(self) } /// Parse a SQL TRY_CAST function e.g. `TRY_CAST(expr AS FLOAT)` pub fn parse_try_cast_expr(&mut self) -> PResult { - self.expect_token(&Token::LParen)?; - let expr = self.parse_expr()?; - self.expect_keyword(Keyword::AS)?; - let data_type = self.parse_data_type()?; - self.expect_token(&Token::RParen)?; - Ok(Expr::TryCast { - expr: Box::new(expr), - data_type, - }) + parser_v2::expr_try_cast(self) } /// Parse a SQL EXISTS expression e.g. `WHERE EXISTS(SELECT ...)`. @@ -1042,83 +1026,21 @@ impl Parser<'_> { } pub fn parse_extract_expr(&mut self) -> PResult { - self.expect_token(&Token::LParen)?; - let field = self.parse_date_time_field_in_extract()?; - self.expect_keyword(Keyword::FROM)?; - let expr = self.parse_expr()?; - self.expect_token(&Token::RParen)?; - Ok(Expr::Extract { - field, - expr: Box::new(expr), - }) + parser_v2::expr_extract(self) } pub fn parse_substring_expr(&mut self) -> PResult { - // PARSE SUBSTRING (EXPR [FROM 1] [FOR 3]) - self.expect_token(&Token::LParen)?; - let expr = self.parse_expr()?; - let mut from_expr = None; - if self.parse_keyword(Keyword::FROM) || self.consume_token(&Token::Comma) { - from_expr = Some(self.parse_expr()?); - } - - let mut to_expr = None; - if self.parse_keyword(Keyword::FOR) || self.consume_token(&Token::Comma) { - to_expr = Some(self.parse_expr()?); - } - self.expect_token(&Token::RParen)?; - - Ok(Expr::Substring { - expr: Box::new(expr), - substring_from: from_expr.map(Box::new), - substring_for: to_expr.map(Box::new), - }) + parser_v2::expr_substring(self) } /// `POSITION( IN )` pub fn parse_position_expr(&mut self) -> PResult { - self.expect_token(&Token::LParen)?; - - // Logically `parse_expr`, but limited to those with precedence higher than `BETWEEN`/`IN`, - // to avoid conflict with general IN operator, for example `position(a IN (b) IN (c))`. - // https://github.com/postgres/postgres/blob/REL_15_2/src/backend/parser/gram.y#L16012 - let substring = self.parse_subexpr(Precedence::Between)?; - self.expect_keyword(Keyword::IN)?; - let string = self.parse_subexpr(Precedence::Between)?; - - self.expect_token(&Token::RParen)?; - - Ok(Expr::Position { - substring: Box::new(substring), - string: Box::new(string), - }) + parser_v2::expr_position(self) } /// `OVERLAY( PLACING FROM [ FOR ])` pub fn parse_overlay_expr(&mut self) -> PResult { - self.expect_token(&Token::LParen)?; - - let expr = self.parse_expr()?; - - self.expect_keyword(Keyword::PLACING)?; - let new_substring = self.parse_expr()?; - - self.expect_keyword(Keyword::FROM)?; - let start = self.parse_expr()?; - - let mut count = None; - if self.parse_keyword(Keyword::FOR) { - count = Some(self.parse_expr()?); - } - - self.expect_token(&Token::RParen)?; - - Ok(Expr::Overlay { - expr: Box::new(expr), - new_substring: Box::new(new_substring), - start: Box::new(start), - count: count.map(Box::new), - }) + parser_v2::expr_overlay(self) } /// `TRIM ([WHERE] ['text'] FROM 'text')`\ @@ -3722,25 +3644,31 @@ impl Parser<'_> { alt(( preceded( (Keyword::SYSTEM_TIME, Keyword::AS, Keyword::OF), - alt(( - ( - Self::parse_identifier.verify(|ident| { - ident.real_value() == "proctime" || ident.real_value() == "now" - }), - Token::LParen, - Token::RParen, - ) - .value(AsOf::ProcessTime), - literal_i64.map(AsOf::VersionNum), - Self::parse_literal_string.map(AsOf::TimestampString), - )), + cut_err( + alt(( + ( + Self::parse_identifier.verify(|ident| { + ident.real_value() == "proctime" || ident.real_value() == "now" + }), + cut_err(Token::LParen), + cut_err(Token::RParen), + ) + .value(AsOf::ProcessTime), + literal_i64.map(AsOf::TimestampNum), + single_quoted_string.map(AsOf::TimestampString), + )) + .expect("proctime(), now(), number or string"), + ), ), preceded( (Keyword::SYSTEM_VERSION, Keyword::AS, Keyword::OF), - alt(( - literal_i64.map(AsOf::VersionNum), - Self::parse_literal_string.map(AsOf::VersionString), - )), + cut_err( + alt(( + literal_i64.map(AsOf::VersionNum), + single_quoted_string.map(AsOf::VersionString), + )) + .expect("number or string"), + ), ), )) .parse_next(self) diff --git a/src/sqlparser/src/parser_v2/expr.rs b/src/sqlparser/src/parser_v2/expr.rs index 7447984d7caf0..2923919ac83b9 100644 --- a/src/sqlparser/src/parser_v2/expr.rs +++ b/src/sqlparser/src/parser_v2/expr.rs @@ -9,14 +9,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use winnow::combinator::{cut_err, opt, preceded, repeat, trace}; +use winnow::combinator::{alt, cut_err, opt, preceded, repeat, seq, trace}; +use winnow::error::ContextError; use winnow::{PResult, Parser}; -use super::TokenStream; +use super::{data_type, token, ParserExt, TokenStream}; use crate::ast::Expr; use crate::keywords::Keyword; +use crate::parser::Precedence; +use crate::tokenizer::Token; -fn expr(input: &mut S) -> PResult +fn expr_parse(input: &mut S) -> PResult where S: TokenStream, { @@ -27,22 +30,32 @@ where .parse_next(input) } +fn subexpr(precedence: Precedence) -> impl Parser +where + S: TokenStream, +{ + // TODO: implement this function using combinator style. + trace("subexpr", move |input: &mut S| { + input.parse_v1(|parser| parser.parse_subexpr(precedence)) + }) +} + pub fn expr_case(input: &mut S) -> PResult where S: TokenStream, { let parse = ( - opt(expr), + opt(expr_parse), repeat( 1.., ( Keyword::WHEN, - cut_err(expr), + cut_err(expr_parse), cut_err(Keyword::THEN), - cut_err(expr), + cut_err(expr_parse), ), ), - opt(preceded(Keyword::ELSE, cut_err(expr))), + opt(preceded(Keyword::ELSE, cut_err(expr_parse))), cut_err(Keyword::END), ) .map(|(operand, branches, else_result, _)| { @@ -58,3 +71,126 @@ where trace("expr_case", parse).parse_next(input) } + +/// Consume a SQL CAST function e.g. `CAST(expr AS FLOAT)` +pub fn expr_cast(input: &mut S) -> PResult +where + S: TokenStream, +{ + let parse = cut_err(seq! {Expr::Cast { + _: Token::LParen, + expr: expr_parse.map(Box::new), + _: Keyword::AS, + data_type: data_type, + _: Token::RParen, + }}); + + trace("expr_cast", parse).parse_next(input) +} + +/// Consume a SQL TRY_CAST function e.g. `TRY_CAST(expr AS FLOAT)` +pub fn expr_try_cast(input: &mut S) -> PResult +where + S: TokenStream, +{ + let parse = cut_err(seq! {Expr::TryCast { + _: Token::LParen, + expr: expr_parse.map(Box::new), + _: Keyword::AS, + data_type: data_type, + _: Token::RParen, + }}); + + trace("expr_try_cast", parse).parse_next(input) +} + +/// Consume a SQL EXTRACT function e.g. `EXTRACT(YEAR FROM expr)` +pub fn expr_extract(input: &mut S) -> PResult +where + S: TokenStream, +{ + let mut date_time_field = token + .verify_map(|token| match token.token { + Token::Word(w) => Some(w.value.to_uppercase()), + Token::SingleQuotedString(s) => Some(s.to_uppercase()), + _ => None, + }) + .expect("date/time field"); + + let parse = cut_err(seq! {Expr::Extract { + _: Token::LParen, + field: date_time_field, + _: Keyword::FROM, + expr: expr_parse.map(Box::new), + _: Token::RParen, + }}); + + trace("expr_extract", parse).parse_next(input) +} + +/// Consume `SUBSTRING (EXPR [FROM 1] [FOR 3])` +pub fn expr_substring(input: &mut S) -> PResult +where + S: TokenStream, +{ + let mut substring_from = opt(preceded( + alt((Token::Comma.void(), Keyword::FROM.void())), + cut_err(expr_parse).map(Box::new), + )); + let mut substring_for = opt(preceded( + alt((Token::Comma.void(), Keyword::FOR.void())), + cut_err(expr_parse).map(Box::new), + )); + let parse = cut_err(seq! {Expr::Substring { + _: Token::LParen, + expr: expr_parse.map(Box::new), + substring_from: substring_from, + substring_for: substring_for, + _: Token::RParen, + }}); + + trace("expr_substring", parse).parse_next(input) +} + +/// `POSITION( IN )` +pub fn expr_position(input: &mut S) -> PResult +where + S: TokenStream, +{ + let parse = cut_err(seq! {Expr::Position { + _: Token::LParen, + // Logically `parse_expr`, but limited to those with precedence higher than `BETWEEN`/`IN`, + // to avoid conflict with general IN operator, for example `position(a IN (b) IN (c))`. + // https://github.com/postgres/postgres/blob/REL_15_2/src/backend/parser/gram.y#L16012 + substring: subexpr(Precedence::Between).map(Box::new), + _: Keyword::IN, + string: subexpr(Precedence::Between).map(Box::new), + _: Token::RParen, + }}); + + trace("expr_position", parse).parse_next(input) +} + +/// `OVERLAY( PLACING FROM [ FOR ])` +pub fn expr_overlay(input: &mut S) -> PResult +where + S: TokenStream, +{ + let mut count_parse = opt(preceded( + Keyword::FOR.void(), + cut_err(expr_parse).map(Box::new), + )); + + let parse = cut_err(seq! {Expr::Overlay { + _: Token::LParen, + expr: expr_parse.map(Box::new), + _: Keyword::PLACING, + new_substring: expr_parse.map(Box::new), + _: Keyword::FROM, + start: expr_parse.map(Box::new), + count: count_parse, + _: Token::RParen, + }}); + + trace("expr_overlay", parse).parse_next(input) +} diff --git a/src/sqlparser/src/parser_v2/mod.rs b/src/sqlparser/src/parser_v2/mod.rs index 366b17fb23bf5..729b56a51253a 100644 --- a/src/sqlparser/src/parser_v2/mod.rs +++ b/src/sqlparser/src/parser_v2/mod.rs @@ -116,6 +116,19 @@ where .parse_next(input) } +/// Consume an 'single-quoted string'. +pub fn single_quoted_string(input: &mut S) -> PResult +where + S: TokenStream, +{ + token + .verify_map(|t| match &t.token { + Token::SingleQuotedString(s) => Some(s.clone()), + _ => None, + }) + .parse_next(input) +} + /// Consume an object name. /// /// FIXME: Object name is extremely complex, we only handle a subset here. diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index c694aba3d1308..a3159400b88fe 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -1268,9 +1268,6 @@ fn parse_extract() { verified_stmt("SELECT EXTRACT(HOUR FROM d)"); verified_stmt("SELECT EXTRACT(MINUTE FROM d)"); verified_stmt("SELECT EXTRACT(SECOND FROM d)"); - - let res = parse_sql_statements("SELECT EXTRACT(0 FROM d)"); - assert!(format!("{}", res.unwrap_err()).contains("expected date/time field, found: 0")); } #[test] @@ -2919,38 +2916,6 @@ fn parse_substring() { one_statement_parses_to("SELECT SUBSTRING('1' FOR 3)", "SELECT SUBSTRING('1' FOR 3)"); } -#[test] -fn parse_overlay() { - one_statement_parses_to( - "SELECT OVERLAY('abc' PLACING 'xyz' FROM 1)", - "SELECT OVERLAY('abc' PLACING 'xyz' FROM 1)", - ); - - one_statement_parses_to( - "SELECT OVERLAY('abc' PLACING 'xyz' FROM 1 FOR 2)", - "SELECT OVERLAY('abc' PLACING 'xyz' FROM 1 FOR 2)", - ); - - for (sql, err_msg) in [ - ("SELECT OVERLAY('abc', 'xyz')", "expected PLACING, found: ,"), - ( - "SELECT OVERLAY('abc' PLACING 'xyz')", - "expected FROM, found: )", - ), - ( - "SELECT OVERLAY('abc' PLACING 'xyz' FOR 2)", - "expected FROM, found: FOR", - ), - ( - "SELECT OVERLAY('abc' PLACING 'xyz' FOR 2 FROM 1)", - "expected FROM, found: FOR", - ), - ] { - let res = parse_sql_statements(sql); - assert!(format!("{}", res.unwrap_err()).contains(err_msg)); - } -} - #[test] fn parse_trim() { one_statement_parses_to( diff --git a/src/sqlparser/tests/testdata/as_of.yaml b/src/sqlparser/tests/testdata/as_of.yaml new file mode 100644 index 0000000000000..2d7c81881988b --- /dev/null +++ b/src/sqlparser/tests/testdata/as_of.yaml @@ -0,0 +1,23 @@ +# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`. +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF PROCTIME() ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF NOW() on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF PROCTIME() ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF 1 on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF 1 ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF 'string' on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_TIME AS OF 'string' ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME on a1 = a2; + error_msg: |- + sql parser error: expected proctime(), now(), number or string + LINE 1: select * from t1 left join t2 FOR SYSTEM_TIME AS OF PROCTIME on a1 = a2; + ^ +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF 1 on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_VERSION AS OF 1 ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF 'string' on a1 = a2; + formatted_sql: SELECT * FROM t1 LEFT JOIN t2 FOR SYSTEM_VERSION AS OF 'string' ON a1 = a2 +- input: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF PROCTIME() on a1 = a2; + error_msg: |- + sql parser error: expected number or string + LINE 1: select * from t1 left join t2 FOR SYSTEM_VERSION AS OF PROCTIME() on a1 = a2; + ^ diff --git a/src/sqlparser/tests/testdata/extract.yaml b/src/sqlparser/tests/testdata/extract.yaml new file mode 100644 index 0000000000000..ba38c3e25f261 --- /dev/null +++ b/src/sqlparser/tests/testdata/extract.yaml @@ -0,0 +1,6 @@ +# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`. +- input: SELECT EXTRACT(0 FROM d) + error_msg: |- + sql parser error: expected date/time field + LINE 1: SELECT EXTRACT(0 FROM d) + ^ diff --git a/src/sqlparser/tests/testdata/overlay.yaml b/src/sqlparser/tests/testdata/overlay.yaml new file mode 100644 index 0000000000000..07e51c46abb60 --- /dev/null +++ b/src/sqlparser/tests/testdata/overlay.yaml @@ -0,0 +1,13 @@ +# This file is automatically generated by `src/sqlparser/tests/parser_test.rs`. +- input: SELECT OVERLAY('abc' PLACING 'xyz' FROM 1) + formatted_sql: SELECT OVERLAY('abc' PLACING 'xyz' FROM 1) +- input: SELECT OVERLAY('abc' PLACING 'xyz' FROM 1 FOR 2) + formatted_sql: SELECT OVERLAY('abc' PLACING 'xyz' FROM 1 FOR 2) +- input: SELECT OVERLAY('abc', 'xyz') + error_msg: "sql parser error: \nLINE 1: SELECT OVERLAY('abc', 'xyz')\n ^" +- input: SELECT OVERLAY('abc' PLACING 'xyz') + error_msg: "sql parser error: \nLINE 1: SELECT OVERLAY('abc' PLACING 'xyz')\n ^" +- input: SELECT OVERLAY('abc' PLACING 'xyz' FOR 2) + error_msg: "sql parser error: \nLINE 1: SELECT OVERLAY('abc' PLACING 'xyz' FOR 2)\n ^" +- input: SELECT OVERLAY('abc' PLACING 'xyz' FOR 2 FROM 1) + error_msg: "sql parser error: \nLINE 1: SELECT OVERLAY('abc' PLACING 'xyz' FOR 2 FROM 1)\n ^" diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index 1179c6a26ac02..ab348571a4128 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -15,6 +15,7 @@ use std::ops::Bound::*; use bytes::Bytes; +use more_asserts::debug_assert_le; use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; @@ -212,28 +213,34 @@ impl> BackwardUserIterator { pub async fn rewind(&mut self) -> HummockResult<()> { // Handle range scan match &self.key_range.1 { - Included(end_key) => { + Included(end_key) | Excluded(end_key) => { let full_key = FullKey { - user_key: end_key.clone(), + user_key: end_key.as_ref(), epoch_with_gap: EpochWithGap::new_min_epoch(), }; - self.iterator.seek(full_key.to_ref()).await?; + self.iterator.seek(full_key).await?; } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => self.iterator.rewind().await?, }; // Handle multi-version self.reset(); // Handle range scan when key < begin_key - self.next().await + self.next().await?; + if let Excluded(end_key) = &self.key_range.1 + && self.is_valid() + && self.key().user_key == end_key.as_ref() + { + self.next().await?; + } + Ok(()) } /// Resets the iterating position to the first position where the key >= provided key. pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> { // Handle range scan when key > end_key - let user_key = match &self.key_range.1 { - Included(end_key) => { + let seek_key = match &self.key_range.1 { + Included(end_key) | Excluded(end_key) => { let end_key = end_key.as_ref(); if end_key < user_key { end_key @@ -241,11 +248,10 @@ impl> BackwardUserIterator { user_key } } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => user_key, }; let full_key = FullKey { - user_key, + user_key: seek_key, epoch_with_gap: EpochWithGap::new_min_epoch(), }; self.iterator.seek(full_key).await?; @@ -253,7 +259,15 @@ impl> BackwardUserIterator { // Handle multi-version self.reset(); // Handle range scan when key < begin_key - self.next().await + self.next().await?; + if let Excluded(end_key) = &self.key_range.1 + && self.is_valid() + && self.key().user_key == end_key.as_ref() + { + debug_assert_le!(end_key.as_ref(), user_key); + self.next().await?; + } + Ok(()) } /// Indicates whether the iterator can be used. @@ -579,7 +593,10 @@ mod tests { ]; let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; - let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let backward_iters = vec![BackwardSstableIterator::new( + sstable.clone(), + sstable_store.clone(), + )]; let bmi = MergeIterator::new(backward_iters); let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); @@ -632,6 +649,26 @@ mod tests { .await .unwrap(); assert!(!bui.is_valid()); + + let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let bmi = MergeIterator::new(backward_iters); + + let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); + let end_key = Excluded(iterator_test_bytes_user_key_of(7)); + + let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key)); + bui.rewind().await.unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); + // ----- end-range iterate ----- + bui.seek(iterator_test_user_key_of(7).as_ref()) + .await + .unwrap(); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); + bui.seek(iterator_test_user_key_of(5).as_ref()) + .await + .unwrap(); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); } // ..=right @@ -657,11 +694,17 @@ mod tests { ]; let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; - let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let backward_iters = vec![BackwardSstableIterator::new( + sstable.clone(), + sstable_store.clone(), + )]; let bmi = MergeIterator::new(backward_iters); let end_key = Included(iterator_test_bytes_user_key_of(7)); - let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, end_key)); + let mut bui = BackwardUserIterator::for_test( + bmi, + (Included(iterator_test_bytes_user_key_of(2)), end_key), + ); // ----- basic iterate ----- bui.rewind().await.unwrap(); @@ -671,8 +714,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- end-range iterate ----- @@ -685,8 +726,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- in-range iterate ----- @@ -699,8 +738,6 @@ mod tests { bui.next().await.unwrap(); assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); bui.next().await.unwrap(); - assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); - bui.next().await.unwrap(); assert!(!bui.is_valid()); // ----- begin-range iterate ----- @@ -708,6 +745,27 @@ mod tests { .await .unwrap(); assert!(!bui.is_valid()); + + let end_key = Excluded(iterator_test_bytes_user_key_of(6)); + let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; + let bmi = MergeIterator::new(backward_iters); + let mut bui = BackwardUserIterator::for_test( + bmi, + (Excluded(iterator_test_bytes_user_key_of(2)), end_key), + ); + // ----- basic iterate ----- + bui.seek(iterator_test_user_key_of(6).as_ref()) + .await + .unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + bui.next().await.unwrap(); + assert!(!bui.is_valid()); + bui.seek(iterator_test_user_key_of(7).as_ref()) + .await + .unwrap(); + assert!(bui.is_valid()); + assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); } // left.. diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 32456eaa27ca5..768aa01af686c 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -14,6 +14,7 @@ use std::ops::Bound::*; +use more_asserts::debug_assert_ge; use risingwave_common::must_match; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange}; @@ -124,20 +125,26 @@ impl> UserIterator { // Handle range scan match &self.key_range.0 { - Included(begin_key) => { + Included(begin_key) | Excluded(begin_key) => { let full_key = FullKey { - user_key: begin_key.clone(), + user_key: begin_key.as_ref(), epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; - self.iterator.seek(full_key.to_ref()).await?; + self.iterator.seek(full_key).await?; } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => { self.iterator.rewind().await?; } }; - self.try_advance_to_next_valid().await + self.try_advance_to_next_valid().await?; + if let Excluded(begin_key) = &self.key_range.0 + && self.is_valid() + && self.key().user_key == begin_key.as_ref() + { + self.next().await?; + } + Ok(()) } /// Resets the iterating position to the first position where the key >= provided key. @@ -147,8 +154,8 @@ impl> UserIterator { self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan when key < begin_key - let user_key = match &self.key_range.0 { - Included(begin_key) => { + let seek_key = match &self.key_range.0 { + Included(begin_key) | Excluded(begin_key) => { let begin_key = begin_key.as_ref(); if begin_key > user_key { begin_key @@ -156,17 +163,24 @@ impl> UserIterator { user_key } } - Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => user_key, }; let full_key = FullKey { - user_key, + user_key: seek_key, epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; self.iterator.seek(full_key).await?; - self.try_advance_to_next_valid().await + self.try_advance_to_next_valid().await?; + if let Excluded(begin_key) = &self.key_range.0 + && self.is_valid() + && self.key().user_key == begin_key.as_ref() + { + debug_assert_ge!(begin_key.as_ref(), user_key); + self.next().await?; + } + Ok(()) } /// Indicates whether the iterator can be used. @@ -556,7 +570,11 @@ mod tests { let table = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); - let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; + let iters = vec![SstableIterator::create( + table.clone(), + sstable_store.clone(), + read_options.clone(), + )]; let mi = MergeIterator::new(iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); @@ -609,6 +627,35 @@ mod tests { .await .unwrap(); assert!(!ui.is_valid()); + + let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; + let mi = MergeIterator::new(iters); + + let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); + let end_key = Excluded(iterator_test_bytes_user_key_of(7)); + + let mut ui = UserIterator::for_test(mi, (begin_key, end_key)); + // ----- after-end-range iterate ----- + ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(3).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); + ui.seek(iterator_test_bytes_user_key_of(4).as_ref()) + .await + .unwrap(); + assert!(ui.is_valid()); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); } // ..=right diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 2b67865d1c8a3..89f2b61fd3449 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -24,7 +24,7 @@ await-tree = { workspace = true } bytes = "1" cfg-if = "1" delta_btree_map = { path = "../utils/delta_btree_map" } -educe = "0.5" +educe = "0.6" either = "1" enum-as-inner = "0.6" fail = "0.5" diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 43a2d65cbfb11..12f9d431c7c22 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -14,13 +14,16 @@ use either::Either; use multimap::MultiMap; -use risingwave_common::array::Op; +use risingwave_common::array::{ArrayRef, DataChunk, Op}; use risingwave_common::bail; use risingwave_common::row::RowExt; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::{LogReport, NonStrictExpression}; -use risingwave_expr::table_function::ProjectSetSelectItem; +use risingwave_expr::expr::{self, EvalErrorReport, NonStrictExpression}; +use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter}; +use risingwave_expr::ExprError; +use risingwave_pb::expr::project_set_select_item::PbSelectItem; +use risingwave_pb::expr::PbProjectSetSelectItem; use crate::executor::prelude::*; @@ -226,17 +229,13 @@ impl Inner { for expr_idx in expr_indices { let expr_idx = *expr_idx; let derived_watermark = match &self.select_list[expr_idx] { - ProjectSetSelectItem::Expr(expr) => { + ProjectSetSelectItem::Scalar(expr) => { watermark .clone() - .transform_with_expr( - // TODO: should we build `expr` in non-strict mode? - &NonStrictExpression::new_topmost(expr, LogReport), - expr_idx + PROJ_ROW_ID_OFFSET, - ) + .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET) .await } - ProjectSetSelectItem::TableFunction(_) => { + ProjectSetSelectItem::Set(_) => { bail!("Watermark should not be produced by a table function"); } }; @@ -253,3 +252,64 @@ impl Inner { Ok(ret) } } + +/// Either a scalar expression or a set-returning function. +/// +/// See also [`PbProjectSetSelectItem`]. +/// +/// A similar enum is defined in the `batch` module. The difference is that +/// we use `NonStrictExpression` instead of `BoxedExpression` here. +#[derive(Debug)] +pub enum ProjectSetSelectItem { + Scalar(NonStrictExpression), + Set(BoxedTableFunction), +} + +impl From for ProjectSetSelectItem { + fn from(table_function: BoxedTableFunction) -> Self { + ProjectSetSelectItem::Set(table_function) + } +} + +impl From for ProjectSetSelectItem { + fn from(expr: NonStrictExpression) -> Self { + ProjectSetSelectItem::Scalar(expr) + } +} + +impl ProjectSetSelectItem { + pub fn from_prost( + prost: &PbProjectSetSelectItem, + error_report: impl EvalErrorReport + 'static, + chunk_size: usize, + ) -> Result { + match prost.select_item.as_ref().unwrap() { + PbSelectItem::Expr(expr) => { + expr::build_non_strict_from_prost(expr, error_report).map(Self::Scalar) + } + PbSelectItem::TableFunction(tf) => { + table_function::build_from_prost(tf, chunk_size).map(Self::Set) + } + } + } + + pub fn return_type(&self) -> DataType { + match self { + ProjectSetSelectItem::Scalar(expr) => expr.return_type(), + ProjectSetSelectItem::Set(tf) => tf.return_type(), + } + } + + pub async fn eval<'a>( + &'a self, + input: &'a DataChunk, + ) -> Result, ArrayRef>, ExprError> { + match self { + Self::Scalar(expr) => Ok(Either::Right(expr.eval_infallible(input).await)), + // FIXME(runji): table function should also be evaluated non strictly + Self::Set(tf) => Ok(Either::Left( + TableFunctionOutputIter::new(tf.eval(input).await).await?, + )), + } + } +} diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index c2338394b33ef..155cc20a203a2 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -14,11 +14,10 @@ use multimap::MultiMap; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::table_function::ProjectSetSelectItem; use risingwave_pb::stream_plan::ProjectSetNode; use super::*; -use crate::executor::ProjectSetExecutor; +use crate::executor::{ProjectSetExecutor, ProjectSetSelectItem}; pub struct ProjectSetExecutorBuilder; @@ -35,7 +34,11 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder { .get_select_list() .iter() .map(|proto| { - ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size) + ProjectSetSelectItem::from_prost( + proto, + params.eval_error_report.clone(), + params.env.config().developer.chunk_size, + ) }) .try_collect()?; let watermark_derivations = MultiMap::from_iter( diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index 5cff03284f4b6..3f54056b9487c 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -30,8 +30,8 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { let (tx, source) = MockSource::channel(); let source = source.into_executor(schema, PkIndices::new()); - let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)").into_inner(); - let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)").into_inner(); + let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); + let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)"); let tf1 = repeat(build_from_pretty("1:int4").into_inner(), 1); let tf2 = repeat(build_from_pretty("2:int4").into_inner(), 2); diff --git a/src/utils/delta_btree_map/Cargo.toml b/src/utils/delta_btree_map/Cargo.toml index 48534ffad1867..274a028489395 100644 --- a/src/utils/delta_btree_map/Cargo.toml +++ b/src/utils/delta_btree_map/Cargo.toml @@ -15,7 +15,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -educe = "0.5" +educe = "0.6" enum-as-inner = "0.6" [lints]