diff --git a/.github/workflows/auto-create-doc-issue-by-issue.yml b/.github/workflows/auto-create-doc-issue-by-issue.yml new file mode 100644 index 0000000000000..0c8d78062977a --- /dev/null +++ b/.github/workflows/auto-create-doc-issue-by-issue.yml @@ -0,0 +1,31 @@ +name: Issue Documentation Checker + +on: + issues: + types: + - closed + - labeled + +jobs: + create-issue: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Log the event payload + run: echo "${{ toJSON(github.event) }}" + - name: Check if issue is done and labeled 'user-facing-changes' + uses: dacbd/create-issue-action@main + if: ${{ github.event.action == 'closed' && contains(github.event.issue.labels.*.name, 'user-facing-changes') }} + with: + token: ${{ secrets.ACCESS_TOKEN }} + owner: risingwavelabs + repo: risingwave-docs + title: | + Document: ${{ github.event.issue.title }} + body: | + ## Context + Source Issue URL: ${{ github.event.issue.html_url }} + Created At: ${{ github.event.issue.created_at }} + Created By: ${{ github.event.issue.user.login }} + Closed At: ${{ github.event.issue.closed_at }} diff --git a/.github/workflows/auto-create-docs-pr.yml b/.github/workflows/auto-create-doc-issue-by-pr.yml similarity index 100% rename from .github/workflows/auto-create-docs-pr.yml rename to .github/workflows/auto-create-doc-issue-by-pr.yml diff --git a/Cargo.lock b/Cargo.lock index ce71e2bf98950..99a5a675c0342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1825,13 +1825,13 @@ dependencies = [ [[package]] name = "comfy-table" -version = "7.1.0" +version = "7.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" +checksum = "9ab77dbd8adecaf3f0db40581631b995f312a8a5ae3aa9993188bb8f23d83a5b" dependencies = [ - "crossterm 0.27.0", - "strum 0.25.0", - "strum_macros 0.25.2", + "crossterm 0.26.1", + "strum 0.24.1", + "strum_macros 0.24.3", "unicode-width", ] @@ -2140,14 +2140,17 @@ dependencies = [ [[package]] name = "crossterm" -version = "0.27.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" dependencies = [ - "bitflags 2.4.0", + "bitflags 1.3.2", "crossterm_winapi", "libc", + "mio", "parking_lot 0.12.1", + "signal-hook", + "signal-hook-mio", "winapi", ] @@ -2445,10 +2448,11 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ + "powerfmt", "serde", ] @@ -2551,8 +2555,7 @@ checksum = "86e3bdc80eee6e16b2b6b0f87fbc98c04bee3455e35174c0de1a125d0688c632" [[package]] name = "dlv-list" version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8aead04dc46b5f263c25721cf25c9e595951d15055f8063f92392fa0d7f64cf4" +source = "git+https://github.com/sgodwincs/dlv-list-rs.git?rev=5bbc5d0#5bbc5d0cc84f257e173d851f8dc1674fb6e46f95" dependencies = [ "const-random", ] @@ -3015,7 +3018,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=5d0134b#5d0134b28c0edb03277b01ce08b035ef52c1b783" +source = "git+https://github.com/MrCroxx/foyer?rev=2261151#2261151107ad362851f5fff9ce4fa56e61911b10" dependencies = [ "foyer-common", "foyer-intrusive", @@ -3026,7 +3029,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=5d0134b#5d0134b28c0edb03277b01ce08b035ef52c1b783" +source = "git+https://github.com/MrCroxx/foyer?rev=2261151#2261151107ad362851f5fff9ce4fa56e61911b10" dependencies = [ "bytes", "foyer-workspace-hack", @@ -3041,7 +3044,7 @@ dependencies = [ [[package]] name = "foyer-intrusive" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=5d0134b#5d0134b28c0edb03277b01ce08b035ef52c1b783" +source = "git+https://github.com/MrCroxx/foyer?rev=2261151#2261151107ad362851f5fff9ce4fa56e61911b10" dependencies = [ "bytes", "cmsketch", @@ -3058,7 +3061,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=5d0134b#5d0134b28c0edb03277b01ce08b035ef52c1b783" +source = "git+https://github.com/MrCroxx/foyer?rev=2261151#2261151107ad362851f5fff9ce4fa56e61911b10" dependencies = [ "anyhow", "async-channel", @@ -3087,7 +3090,7 @@ dependencies = [ [[package]] name = "foyer-workspace-hack" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=5d0134b#5d0134b28c0edb03277b01ce08b035ef52c1b783" +source = "git+https://github.com/MrCroxx/foyer?rev=2261151#2261151107ad362851f5fff9ce4fa56e61911b10" dependencies = [ "crossbeam-utils", "either", @@ -3225,9 +3228,9 @@ dependencies = [ [[package]] name = "futures-async-stream" -version = "0.2.7" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f529ccdeacfa2446a9577041686cf1abb839b1b3e15fee4c1b1232ab3b7d799f" +checksum = "379790776b0d953337df4ab7ecc51936c66ea112484cad7912907b1d34253ebf" dependencies = [ "futures-async-stream-macro", "futures-core", @@ -3236,13 +3239,13 @@ dependencies = [ [[package]] name = "futures-async-stream-macro" -version = "0.2.7" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b48ee06dc8d2808ba5ebad075d06c3406085bb19deaac33be64c39113bf80" +checksum = "5df2c13d48c8cb8a3ec093ede6f0f4482f327d7bb781120c5fb483ef0f17e758" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.37", ] [[package]] @@ -5492,8 +5495,7 @@ dependencies = [ [[package]] name = "ordered-multimap" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +source = "git+https://github.com/risingwavelabs/ordered-multimap-rs.git?rev=19c743f#19c743f3e3d106c99ba37628f06a2ca6faa2284f" dependencies = [ "dlv-list", "hashbrown 0.13.2", @@ -6023,6 +6025,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "pprof" version = "0.13.0" @@ -7613,6 +7621,7 @@ dependencies = [ name = "risingwave_jni_core" version = "0.1.0" dependencies = [ + "anyhow", "bytes", "cfg-or-panic", "futures", @@ -9803,14 +9812,15 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", "libc", "num_threads", + "powerfmt", "serde", "time-core", "time-macros", @@ -9818,15 +9828,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index ef09221b818a2..f8a9b7d0e2fa5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ aws-smithy-types = "0.55" aws-endpoint = "0.55" aws-types = "0.55" etcd-client = { package = "madsim-etcd-client", version = "0.4" } -futures-async-stream = "0.2" +futures-async-stream = "0.2.9" hytra = "0.1" rdkafka = { package = "madsim-rdkafka", version = "0.3.0", features = [ "cmake-build", @@ -165,6 +165,8 @@ unused_must_use = "forbid" future_incompatible = "warn" nonstandard_style = "warn" rust_2018_idioms = "warn" +# Backward compatibility is not important for an application. +async_fn_in_trait = "allow" [workspace.lints.clippy] uninlined_format_args = "allow" @@ -229,8 +231,8 @@ opt-level = 2 incremental = false debug = 1 -# Patch third-party crates for deterministic simulation. [patch.crates-io] +# Patch third-party crates for deterministic simulation. quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "8daf97e" } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" } @@ -238,3 +240,8 @@ tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" } # patch: unlimit 4MB message size for grpc client etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" } + +# Patch for coverage_attribute. +# https://github.com/sgodwincs/dlv-list-rs/pull/19#issuecomment-1774786289 +dlv-list = { git = "https://github.com/sgodwincs/dlv-list-rs.git", rev = "5bbc5d0" } +ordered-multimap = { git = "https://github.com/risingwavelabs/ordered-multimap-rs.git", rev = "19c743f" } diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 43ff81ade2b85..59c88e5e9a9ae 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -13,7 +13,7 @@ cat ../rust-toolchain # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20230919 +export BUILD_ENV_VERSION=v20231022 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 6fe7cfbfdeca2..66dd2d175e675 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230919 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231022 depends_on: - mysql - db @@ -81,10 +81,11 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230919 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231022 depends_on: - mysql - db + - message_queue - elasticsearch - clickhouse-server - pulsar @@ -92,12 +93,12 @@ services: - ..:/risingwave rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230919 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231022 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230919 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231022 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -108,7 +109,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20230919 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20231022 depends_on: db: condition: service_healthy diff --git a/ci/rust-toolchain b/ci/rust-toolchain index ebc0b6c285a4e..fe2a026f6e40f 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2023-09-09" +channel = "nightly-2023-10-21" diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 06ef185f46e8b..71a91f2d8fba9 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -3,10 +3,10 @@ # Exits as soon as any line fails. set -euo pipefail -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 @@ -14,7 +14,7 @@ sleep 2 # test append-only kafka sink echo "testing append-only kafka sink" diff ./e2e_test/sink/kafka/append_only1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink is not as expected." exit 1 @@ -23,7 +23,7 @@ fi # test upsert kafka sink echo "testing upsert kafka sink" diff ./e2e_test/sink/kafka/upsert1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink is not as expected." exit 1 @@ -32,7 +32,7 @@ fi # test upsert kafka sink with schema echo "testing upsert kafka sink with schema" diff ./e2e_test/sink/kafka/upsert_schema1.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -40,7 +40,7 @@ fi # test debezium kafka sink echo "testing debezium kafka sink" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink is not as expected." @@ -57,7 +57,7 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '', # test append-only kafka sink after update echo "testing append-only kafka sink after updating data" diff ./e2e_test/sink/kafka/append_only2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for append-only sink after update is not as expected." exit 1 @@ -66,7 +66,7 @@ fi # test upsert kafka sink after update echo "testing upsert kafka sink after updating data" diff ./e2e_test/sink/kafka/upsert2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -75,7 +75,7 @@ fi # test upsert kafka sink with schema after update echo "testing upsert kafka sink with schema after updating data" diff ./e2e_test/sink/kafka/upsert_schema2.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -83,7 +83,7 @@ fi # test debezium kafka sink after update echo "testing debezium kafka sink after updating data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after update is not as expected." @@ -100,7 +100,7 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;" # test upsert kafka sink after delete echo "testing upsert kafka sink after deleting data" diff ./e2e_test/sink/kafka/upsert3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink after update is not as expected." exit 1 @@ -109,7 +109,7 @@ fi # test upsert kafka sink with schema after delete echo "testing upsert kafka sink with schema after deleting data" diff ./e2e_test/sink/kafka/upsert_schema3.result \ -<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) +<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null) if [ $? -ne 0 ]; then echo "The output for upsert sink with schema is not as expected." exit 1 @@ -117,7 +117,7 @@ fi # test debezium kafka sink after delete echo "testing debezium kafka sink after deleting data" -(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null +(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result if [ $? -ne 0 ]; then echo "The output for debezium sink after delete is not as expected." @@ -128,13 +128,13 @@ else fi sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 # test different encoding echo "testing protobuf" cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' -./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 2dc02f0eada7a..ce2cc46381eba 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -57,7 +57,7 @@ node_port=50051 node_timeout=10 echo "--- starting risingwave cluster with connector node" -cargo make ci-start ci-kafka +cargo make ci-start ci-1cn-1fe ./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 & echo "waiting for connector node to start" diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 25e3a59fdff3a..a1f296774f526 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -31,7 +31,7 @@ create connection mock with ( statement error create sink si_kafka_append_only_conn from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true', @@ -42,7 +42,7 @@ create sink si_kafka_append_only_conn from t_kafka with ( statement ok create sink si_kafka_append_only_conn from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true', @@ -66,7 +66,7 @@ drop connection mock; statement error sink cannot be append-only create sink si_kafka_append_only from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', ); @@ -74,7 +74,7 @@ create sink si_kafka_append_only from t_kafka with ( statement ok create sink si_kafka_append_only from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-append-only', type = 'append-only', force_append_only = 'true' @@ -83,7 +83,7 @@ create sink si_kafka_append_only from t_kafka with ( statement error primary key not defined create sink si_kafka_upsert from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert', type = 'upsert', ); @@ -91,7 +91,7 @@ create sink si_kafka_upsert from t_kafka with ( statement ok create sink si_kafka_upsert from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert', type = 'upsert', primary_key = 'id', @@ -100,7 +100,7 @@ create sink si_kafka_upsert from t_kafka with ( statement ok create sink si_kafka_upsert_schema from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-upsert-schema', primary_key = 'id', ) format upsert encode json ( @@ -110,7 +110,7 @@ create sink si_kafka_upsert_schema from t_kafka with ( statement ok create sink si_kafka_debezium from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id', @@ -119,7 +119,7 @@ create sink si_kafka_debezium from t_kafka with ( statement error primary key not defined create sink debezium_without_pk from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', ); @@ -127,7 +127,7 @@ create sink debezium_without_pk from t_kafka with ( statement ok create sink multiple_pk from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar' @@ -139,7 +139,7 @@ drop sink multiple_pk; statement error Sink primary key column not found: invalid. create sink invalid_pk_column from t_kafka with ( connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,invalid' diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index f69c4a9d07110..87ab884eddbde 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -2,7 +2,7 @@ statement ok create table from_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); @@ -37,7 +37,7 @@ statement ok create sink sink0 from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursive', @@ -70,7 +70,7 @@ statement error failed to read file create sink sink_err from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursiv', @@ -80,7 +80,7 @@ statement error encode extra_column error: field not in proto create sink sink_err as select 1 as extra_column with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 'file:///risingwave/proto-recursive', @@ -90,7 +90,7 @@ statement error s3 URL not supported yet create sink sink_err from into_kafka with ( connector = 'kafka', topic = 'test-rw-sink-append-only-protobuf', - properties.bootstrap.server = '127.0.0.1:29092') + properties.bootstrap.server = 'message_queue:29092') format plain encode protobuf ( force_append_only = true, schema.location = 's3:///risingwave/proto-recursive', diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 03bfc2d0b0df1..2ba9ba67feb39 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -3,19 +3,13 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - type = 'append-only', - force_append_only='true', redis.url= 'redis://127.0.0.1:6379/', -); +)FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK bhv_redis_sink_2 FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - type = 'append-only', - force_append_only='true', redis.url= 'redis://127.0.0.1:6379/', - redis.keyformat='user_id:{user_id}', - redis.valueformat='username:{username},event_timestamp{event_timestamp}' -); \ No newline at end of file +)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); \ No newline at end of file diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 27c9f2ee82f83..1efc933a7d033 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -314,6 +314,10 @@ message GetTablesResponse { map tables = 1; } +message WaitRequest {} + +message WaitResponse {} + service DdlService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); @@ -343,4 +347,5 @@ service DdlService { rpc ListConnections(ListConnectionsRequest) returns (ListConnectionsResponse); rpc DropConnection(DropConnectionRequest) returns (DropConnectionResponse); rpc GetTables(GetTablesRequest) returns (GetTablesResponse); + rpc Wait(WaitRequest) returns (WaitResponse); } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index a88242a572693..d4c7a2e04f138 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -106,6 +106,7 @@ enum EncodeType { ENCODE_TYPE_PROTOBUF = 4; ENCODE_TYPE_JSON = 5; ENCODE_TYPE_BYTES = 6; + ENCODE_TYPE_TEMPLATE = 7; } enum RowFormatType { diff --git a/risedev.yml b/risedev.yml index a5ba8a7b43f97..135a33f602a6a 100644 --- a/risedev.yml +++ b/risedev.yml @@ -685,40 +685,6 @@ profile: - use: pubsub persist-data: true - ci-kafka: - config-path: src/config/ci.toml - steps: - - use: minio - - use: etcd - unsafe-no-fsync: true - - use: meta-node - - use: compute-node - enable-tiered-cache: true - - use: frontend - - use: compactor - - use: zookeeper - persist-data: true - - use: kafka - persist-data: true - - ci-kafka-plus-pubsub: - config-path: src/config/ci.toml - steps: - - use: minio - - use: etcd - unsafe-no-fsync: true - - use: meta-node - - use: compute-node - enable-tiered-cache: true - - use: frontend - - use: compactor - - use: zookeeper - persist-data: true - - use: kafka - persist-data: true - - use: pubsub - persist-data: true - ci-redis: config-path: src/config/ci.toml steps: diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 9104c96c951f5..809c096eb49df 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -17,8 +17,8 @@ #![feature(trait_alias)] #![feature(exact_size_is_empty)] #![feature(type_alias_impl_trait)] -#![cfg_attr(coverage, feature(no_coverage))] -#![feature(generators)] +#![cfg_attr(coverage, feature(coverage_attribute))] +#![feature(coroutines)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(iterator_try_collect)] #![feature(lint_reasons)] @@ -27,13 +27,11 @@ #![feature(let_chains)] #![feature(bound_map)] #![feature(int_roundings)] -#![feature(async_fn_in_trait)] #![feature(allocator_api)] #![feature(impl_trait_in_assoc_type)] #![feature(result_option_inspect)] #![feature(assert_matches)] #![feature(lazy_cell)] -#![feature(return_position_impl_trait_in_trait)] mod error; pub mod exchange_source; diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index b49a023acb22b..fb60e352ec293 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -53,7 +53,7 @@ impl TaskService for BatchServiceImpl { type CreateTaskStream = ReceiverStream; type ExecuteStream = ReceiverStream; - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn create_task( &self, request: Request, @@ -97,7 +97,7 @@ impl TaskService for BatchServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn cancel_task( &self, req: Request, @@ -109,7 +109,7 @@ impl TaskService for BatchServiceImpl { Ok(Response::new(CancelTaskResponse { status: None })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn execute( &self, req: Request, diff --git a/src/cmd/src/bin/compactor.rs b/src/cmd/src/bin/compactor.rs index 21b7db2405e2d..554168d8a6683 100644 --- a/src/cmd/src/bin/compactor.rs +++ b/src/cmd/src/bin/compactor.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] risingwave_cmd::main!(compactor); diff --git a/src/cmd/src/bin/compute_node.rs b/src/cmd/src/bin/compute_node.rs index 0bb1e5211ac57..a24d132b70b94 100644 --- a/src/cmd/src/bin/compute_node.rs +++ b/src/cmd/src/bin/compute_node.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] risingwave_cmd::main!(compute); diff --git a/src/cmd/src/bin/ctl.rs b/src/cmd/src/bin/ctl.rs index 38345c7a3fc2e..7b4c3132e747d 100644 --- a/src/cmd/src/bin/ctl.rs +++ b/src/cmd/src/bin/ctl.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] risingwave_cmd::main!(ctl); diff --git a/src/cmd/src/bin/frontend_node.rs b/src/cmd/src/bin/frontend_node.rs index 32d563be109fc..546bacbf1a901 100644 --- a/src/cmd/src/bin/frontend_node.rs +++ b/src/cmd/src/bin/frontend_node.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] risingwave_cmd::main!(frontend); diff --git a/src/cmd/src/bin/meta_node.rs b/src/cmd/src/bin/meta_node.rs index 032cc6bc28285..4bebfc5f915a2 100644 --- a/src/cmd/src/bin/meta_node.rs +++ b/src/cmd/src/bin/meta_node.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] risingwave_cmd::main!(meta); diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 12de26657bd33..93df94a63816a 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -30,7 +30,7 @@ macro_rules! main { #[cfg(not(enable_task_local_alloc))] risingwave_common::enable_jemalloc!(); - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] fn main() { let opts = clap::Parser::parse(); $crate::$component(opts); diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 3e9088e16b9e2..b7693c6fa06a2 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] use std::str::FromStr; @@ -158,7 +158,7 @@ impl Component { } } -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] fn main() -> Result<()> { let risingwave = || { command!(BINARY_NAME) diff --git a/src/common/proc_macro/src/config.rs b/src/common/proc_macro/src/config.rs index 285834eb123cf..6e369fbad33eb 100644 --- a/src/common/proc_macro/src/config.rs +++ b/src/common/proc_macro/src/config.rs @@ -41,7 +41,7 @@ fn type_is_option(ty: &syn::Type) -> bool { false } -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] pub fn produce_override_config(input: DeriveInput) -> TokenStream { let syn::Data::Struct(syn::DataStruct { fields, .. }) = input.data else { abort!(input, "Only struct is supported"); diff --git a/src/common/proc_macro/src/lib.rs b/src/common/proc_macro/src/lib.rs index 060ee1950624e..a11e407c6c053 100644 --- a/src/common/proc_macro/src/lib.rs +++ b/src/common/proc_macro/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] use estimate_size::{ add_trait_bounds, extract_ignored_generics_list, has_nested_flag_attribute_list, @@ -52,7 +52,7 @@ mod estimate_size; /// } /// } /// ``` -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] #[proc_macro_derive(OverrideConfig, attributes(override_opts))] #[proc_macro_error] pub fn override_config(input: TokenStream) -> TokenStream { diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 2a3575d8dae78..fbcd3854fa572 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -24,12 +24,11 @@ #![feature(trusted_len)] #![feature(allocator_api)] #![feature(lint_reasons)] -#![feature(generators)] +#![feature(coroutines)] #![feature(map_try_insert)] #![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(let_chains)] -#![feature(return_position_impl_trait_in_trait)] #![feature(portable_simd)] #![feature(array_chunks)] #![feature(inline_const_pat)] @@ -43,7 +42,6 @@ #![feature(result_option_inspect)] #![feature(map_entry_replace)] #![feature(negative_impls)] -#![feature(async_fn_in_trait)] #![feature(bound_map)] #![feature(array_methods)] diff --git a/src/common/src/types/ordered.rs b/src/common/src/types/ordered.rs index 75b07e529d7b9..68cd6329287e2 100644 --- a/src/common/src/types/ordered.rs +++ b/src/common/src/types/ordered.rs @@ -138,7 +138,7 @@ impl From for DefaultOrdered { } } -#[allow(clippy::incorrect_partial_ord_impl_on_ord_type)] +#[allow(clippy::non_canonical_partial_ord_impl)] impl PartialOrd for DefaultOrdered { fn partial_cmp(&self, other: &Self) -> Option { self.0.default_partial_cmp(other.as_inner()) diff --git a/src/common/src/util/future_utils.rs b/src/common/src/util/future_utils.rs index 75c38488457ac..20844d8cd15d4 100644 --- a/src/common/src/util/future_utils.rs +++ b/src/common/src/util/future_utils.rs @@ -13,9 +13,11 @@ // limitations under the License. use std::future::pending; +use std::pin::{pin, Pin}; -use futures::future::Either; -use futures::{Future, FutureExt, Stream}; +use futures::future::{select, Either}; +use futures::stream::Peekable; +use futures::{Future, FutureExt, Stream, StreamExt}; /// Convert a list of streams into a [`Stream`] of results from the streams. pub fn select_all( @@ -43,3 +45,34 @@ pub fn drop_either_future( Either::Right((right, _)) => Either::Right(right), } } + +/// Await on a future while monitoring on a peekable stream that may return error. +/// The peekable stream is polled at a higher priority than the future. +/// +/// When the peekable stream returns with a error and end of stream, the future will +/// return the error immediately. Otherwise, it will keep polling the given future. +/// +/// Return: +/// - Ok(output) as the output of the given future. +/// - Err(None) to indicate that the stream has reached the end. +/// - Err(e) to indicate that the stream returns an error. +pub async fn await_future_with_monitor_error_stream( + peek_stream: &mut Peekable> + Unpin>, + future: F, +) -> Result> { + // Poll the response stream to early see the error + match select(pin!(Pin::new(&mut *peek_stream).peek()), pin!(future)).await { + Either::Left((response_result, send_future)) => match response_result { + None => Err(None), + Some(Err(_)) => { + let err = match peek_stream.next().now_or_never() { + Some(Some(Err(err))) => err, + _ => unreachable!("peek has output, peek output not None, have check err"), + }; + Err(Some(err)) + } + Some(Ok(_)) => Ok(send_future.await), + }, + Either::Right((output, _)) => Ok(output), + } +} diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index f4140b558faa7..e1f85263e1415 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -45,7 +45,9 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; -pub use future_utils::{drop_either_future, pending_on_none, select_all}; +pub use future_utils::{ + await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all, +}; #[macro_use] pub mod match_util; diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 65bf59eedf19e..fc5ae9ff19854 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -13,14 +13,14 @@ // limitations under the License. #![feature(trait_alias)] -#![feature(generators)] +#![feature(coroutines)] #![feature(type_alias_impl_trait)] #![feature(let_chains)] #![feature(result_option_inspect)] #![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] #![feature(lazy_cell)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] #[macro_use] extern crate tracing; diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index b59cc39587c2f..6225cef2a7e30 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -49,7 +49,7 @@ impl ExchangeService for ExchangeServiceImpl { type GetDataStream = BatchDataStream; type GetStreamStream = StreamDataStream; - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn get_data( &self, request: Request, diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 97a0b80773791..8fc24664ec016 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -53,7 +53,7 @@ impl MonitorServiceImpl { #[async_trait::async_trait] impl MonitorService for MonitorServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn stack_trace( &self, request: Request, @@ -85,7 +85,7 @@ impl MonitorService for MonitorServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn profiling( &self, request: Request, @@ -115,7 +115,7 @@ impl MonitorService for MonitorServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn heap_profiling( &self, request: Request, @@ -166,7 +166,7 @@ impl MonitorService for MonitorServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn list_heap_profiling( &self, _request: Request, @@ -206,7 +206,7 @@ impl MonitorService for MonitorServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn analyze_heap( &self, request: Request, diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 525364b60dc1c..1c1448b3d1e45 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -45,7 +45,7 @@ impl StreamServiceImpl { #[async_trait::async_trait] impl StreamService for StreamServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn update_actors( &self, request: Request, @@ -61,7 +61,7 @@ impl StreamService for StreamServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn build_actors( &self, request: Request, @@ -85,7 +85,7 @@ impl StreamService for StreamServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn broadcast_actor_info_table( &self, request: Request, @@ -104,7 +104,7 @@ impl StreamService for StreamServiceImpl { } } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn drop_actors( &self, request: Request, @@ -118,7 +118,7 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn force_stop_actors( &self, request: Request, @@ -132,7 +132,7 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn inject_barrier( &self, request: Request, @@ -173,7 +173,7 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn barrier_complete( &self, request: Request, @@ -243,7 +243,7 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn wait_epoch_commit( &self, request: Request, diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index b3e39ece95002..6a50b8410bbd4 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -13,7 +13,7 @@ // limitations under the License. #![feature(let_chains)] -#![feature(generators)] +#![feature(coroutines)] use std::sync::atomic::AtomicU64; use std::sync::Arc; diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index a43ae2e5762da..6d7e93365c275 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(generators)] +#![feature(coroutines)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] use std::sync::atomic::AtomicU64; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 4886b1b52fcc5..87d2a0bdef689 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -113,7 +113,7 @@ strum = "0.25" strum_macros = "0.25" tempfile = "3" thiserror = "1" -time = "0.3.28" +time = "0.3.30" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 8ccf62486ce65..aa613b4043c23 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -14,7 +14,7 @@ #![expect(dead_code)] #![allow(clippy::derive_partial_eq_without_eq)] -#![feature(generators)] +#![feature(coroutines)] #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] #![feature(box_patterns)] @@ -25,11 +25,9 @@ #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] -#![feature(return_position_impl_trait_in_trait)] -#![feature(async_fn_in_trait)] #![feature(associated_type_defaults)] #![feature(impl_trait_in_assoc_type)] -#![feature(iter_from_generator)] +#![feature(iter_from_coroutine)] #![feature(if_let_guard)] #![feature(iterator_try_collect)] diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index c18dd7d10a92c..ca3a09e7f2eda 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -132,6 +132,7 @@ pub enum SinkEncode { Json, Protobuf, Avro, + Template, } impl SinkFormatDesc { @@ -177,6 +178,7 @@ impl SinkFormatDesc { SinkEncode::Json => E::Json, SinkEncode::Protobuf => E::Protobuf, SinkEncode::Avro => E::Avro, + SinkEncode::Template => E::Template, }; let options = self .options @@ -212,6 +214,7 @@ impl TryFrom for SinkFormatDesc { let encode = match value.encode() { E::Json => SinkEncode::Json, E::Protobuf => SinkEncode::Protobuf, + E::Template => SinkEncode::Template, E::Avro => SinkEncode::Avro, e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs index 85f085989b6c4..97d8271f9e83a 100644 --- a/src/connector/src/sink/encoder/template.rs +++ b/src/connector/src/sink/encoder/template.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use regex::Regex; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::ToText; use super::{Result, RowEncoder}; +use crate::sink::SinkError; /// Encode a row according to a specified string template `user_id:{user_id}` pub struct TemplateEncoder { @@ -34,6 +38,24 @@ impl TemplateEncoder { template, } } + + pub fn check_string_format(format: &str, set: &HashSet) -> Result<()> { + // We will check if the string inside {} corresponds to a column name in rw. + // In other words, the content within {} should exclusively consist of column names from rw, + // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. + let re = Regex::new(r"\{([^}]*)\}").unwrap(); + if !re.is_match(format) { + return Err(SinkError::Redis( + "Can't find {} in key_format or value_format".to_string(), + )); + } + for capture in re.captures_iter(format) { + if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ + return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) + } + } + Ok(()) + } } impl RowEncoder for TemplateEncoder { diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 523a52dab91bb..f0efcc21d9009 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -40,7 +40,7 @@ impl SinkFormatter for AppendOnlyFormatter impl Iterator, Option)>> { - std::iter::from_generator(|| { + std::iter::from_coroutine(|| { for (op, row) in chunk.rows() { if op != Op::Insert { continue; diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index 637aa23f06410..ce98daab88756 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -85,7 +85,7 @@ impl SinkFormatter for DebeziumJsonFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_generator(|| { + std::iter::from_coroutine(|| { let DebeziumJsonFormatter { schema, pk_indices, diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index a7463f7e3b306..17cb708292890 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -29,6 +29,7 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; use super::encoder::KafkaConnectParams; +use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode}; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, @@ -92,7 +93,7 @@ impl SinkFormatterImpl { let key_encoder = (!pk_indices.is_empty()).then(|| { JsonEncoder::new( schema.clone(), - Some(pk_indices), + Some(pk_indices.clone()), TimestampHandlingMode::Milli, ) }); @@ -115,6 +116,28 @@ impl SinkFormatterImpl { Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) } SinkEncode::Avro => err_unsupported(), + SinkEncode::Template => { + let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = + format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'redis_value_format',please set it or use JSON" + )) + })?; + let key_encoder = TemplateEncoder::new( + schema.clone(), + Some(pk_indices), + key_format.clone(), + ); + let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); + Ok(SinkFormatterImpl::AppendOnlyTemplate( + AppendOnlyFormatter::new(Some(key_encoder), val_encoder), + )) + } } } SinkFormat::Debezium => { @@ -131,85 +154,66 @@ impl SinkFormatterImpl { ))) } SinkFormat::Upsert => { - if format_desc.encode != SinkEncode::Json { - return err_unsupported(); - } + match format_desc.encode { + SinkEncode::Json => { + let mut key_encoder = JsonEncoder::new( + schema.clone(), + Some(pk_indices), + TimestampHandlingMode::Milli, + ); + let mut val_encoder = + JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - let mut key_encoder = JsonEncoder::new( - schema.clone(), - Some(pk_indices), - TimestampHandlingMode::Milli, - ); - let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - - if let Some(s) = format_desc.options.get("schemas.enable") { - match s.to_lowercase().parse::() { - Ok(true) => { - let kafka_connect = KafkaConnectParams { - schema_name: format!("{}.{}", db_name, sink_from_name), - }; - key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone()); - val_encoder = val_encoder.with_kafka_connect(kafka_connect); - } - Ok(false) => {} - _ => { - return Err(SinkError::Config(anyhow!( - "schemas.enable is expected to be `true` or `false`, got {}", - s - ))); - } + if let Some(s) = format_desc.options.get("schemas.enable") { + match s.to_lowercase().parse::() { + Ok(true) => { + let kafka_connect = KafkaConnectParams { + schema_name: format!("{}.{}", db_name, sink_from_name), + }; + key_encoder = + key_encoder.with_kafka_connect(kafka_connect.clone()); + val_encoder = val_encoder.with_kafka_connect(kafka_connect); + } + Ok(false) => {} + _ => { + return Err(SinkError::Config(anyhow!( + "schemas.enable is expected to be `true` or `false`, got {}", + s + ))); + } + } + }; + + // Initialize the upsert_stream + let formatter = UpsertFormatter::new(key_encoder, val_encoder); + Ok(SinkFormatterImpl::UpsertJson(formatter)) } - }; - - // Initialize the upsert_stream - let formatter = UpsertFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::UpsertJson(formatter)) - } - } - } - - pub fn new_with_redis( - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - key_format: Option, - value_format: Option, - ) -> Result { - match (key_format, value_format) { - (Some(k), Some(v)) => { - let key_encoder = TemplateEncoder::new( - schema.clone(), - Some(pk_indices), - k, - ); - let val_encoder = - TemplateEncoder::new(schema, None, v); - if is_append_only { - Ok(SinkFormatterImpl::AppendOnlyTemplate(AppendOnlyFormatter::new(Some(key_encoder), val_encoder))) - } else { - Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(key_encoder, val_encoder))) - } - } - (None, None) => { - let key_encoder = JsonEncoder::new( - schema.clone(), - Some(pk_indices), - TimestampHandlingMode::Milli, - ); - let val_encoder = JsonEncoder::new( - schema, - None, - TimestampHandlingMode::Milli, - ); - if is_append_only { - Ok(SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(Some(key_encoder), val_encoder))) - } else { - Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(key_encoder, val_encoder))) + SinkEncode::Template => { + let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = + format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'redis_value_format',please set it or use JSON" + )) + })?; + let key_encoder = TemplateEncoder::new( + schema.clone(), + Some(pk_indices), + key_format.clone(), + ); + let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); + Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new( + key_encoder, + val_encoder, + ))) + } + _ => err_unsupported(), } } - _ => { - Err(SinkError::Encode("Please provide template formats for both key and value, or choose the JSON format.".to_string())) - } } } } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index 6ef2b5f2ca333..af8e70ff92850 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -40,7 +40,7 @@ impl SinkFormatter for UpsertFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_generator(|| { + std::iter::from_coroutine(|| { for (op, row) in chunk.rows() { let event_key_object = Some(tri!(self.key_encoder.encode(row))); diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index cc8ff74d0c9c5..6120075a049df 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -18,29 +18,27 @@ use anyhow::anyhow; use async_trait::async_trait; use redis::aio::Connection; use redis::{Client as RedisClient, Pipeline}; -use regex::Regex; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; +use super::catalog::SinkFormatDesc; +use super::encoder::template::TemplateEncoder; use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; -use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{SinkError, SinkParam}; use crate::dispatch_sink_formatter_impl; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const REDIS_SINK: &str = "redis"; - +pub const KEY_FORMAT: &str = "key_format"; +pub const VALUE_FORMAT: &str = "value_format"; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct RedisCommon { #[serde(rename = "redis.url")] pub url: String, - #[serde(rename = "redis.keyformat")] - pub key_format: Option, - #[serde(rename = "redis.valueformat")] - pub value_format: Option, } impl RedisCommon { @@ -54,23 +52,13 @@ impl RedisCommon { pub struct RedisConfig { #[serde(flatten)] pub common: RedisCommon, - - pub r#type: String, // accept "append-only" or "upsert" } impl RedisConfig { pub fn from_hashmap(properties: HashMap) -> Result { let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; - if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { - return Err(SinkError::Config(anyhow!( - "`{}` must be {}, or {}", - SINK_TYPE_OPTION, - SINK_TYPE_APPEND_ONLY, - SINK_TYPE_UPSERT - ))); - } + .map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?; Ok(config) } } @@ -79,28 +67,10 @@ impl RedisConfig { pub struct RedisSink { config: RedisConfig, schema: Schema, - is_append_only: bool, pk_indices: Vec, -} - -fn check_string_format(format: &Option, set: &HashSet) -> Result<()> { - if let Some(format) = format { - // We will check if the string inside {} corresponds to a column name in rw. - // In other words, the content within {} should exclusively consist of column names from rw, - // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. - let re = Regex::new(r"\{([^}]*)\}").unwrap(); - if !re.is_match(format) { - return Err(SinkError::Redis( - "Can't find {} in key_format or value_format".to_string(), - )); - } - for capture in re.captures_iter(format) { - if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ - return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) - } - } - } - Ok(()) + format_desc: SinkFormatDesc, + db_name: String, + sink_from_name: String, } #[async_trait] @@ -117,8 +87,12 @@ impl TryFrom for RedisSink { Ok(Self { config, schema: param.schema(), - is_append_only: param.sink_type.is_append_only(), pk_indices: param.downstream_pk, + format_desc: param + .format_desc + .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, + db_name: param.db_name, + sink_from_name: param.sink_from_name, }) } } @@ -134,7 +108,9 @@ impl Sink for RedisSink { self.config.clone(), self.schema.clone(), self.pk_indices.clone(), - self.is_append_only, + &self.format_desc, + self.db_name.clone(), + self.sink_from_name.clone(), ) .await? .into_log_sinker(writer_param.sink_metrics)) @@ -157,8 +133,23 @@ impl Sink for RedisSink { .filter(|(k, _)| self.pk_indices.contains(k)) .map(|(_, v)| v.name.clone()) .collect(); - check_string_format(&self.config.common.key_format, &pk_set)?; - check_string_format(&self.config.common.value_format, &all_set)?; + if matches!( + self.format_desc.encode, + super::catalog::SinkEncode::Template + ) { + let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'key_format',please set it or use JSON" + )) + })?; + let value_format = self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find 'value_format',please set it or use JSON" + )) + })?; + TemplateEncoder::check_string_format(key_format, &pk_set)?; + TemplateEncoder::check_string_format(value_format, &all_set)?; + } Ok(()) } } @@ -166,7 +157,6 @@ impl Sink for RedisSink { pub struct RedisSinkWriter { epoch: u64, schema: Schema, - is_append_only: bool, pk_indices: Vec, formatter: SinkFormatterImpl, payload_writer: RedisSinkPayloadWriter, @@ -220,21 +210,23 @@ impl RedisSinkWriter { config: RedisConfig, schema: Schema, pk_indices: Vec, - is_append_only: bool, + format_desc: &SinkFormatDesc, + db_name: String, + sink_from_name: String, ) -> Result { let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?; - let formatter = SinkFormatterImpl::new_with_redis( + let formatter = SinkFormatterImpl::new( + format_desc, schema.clone(), pk_indices.clone(), - is_append_only, - config.common.key_format, - config.common.value_format, - )?; + db_name, + sink_from_name, + ) + .await?; Ok(Self { schema, pk_indices, - is_append_only, epoch: 0, formatter, payload_writer, @@ -242,24 +234,22 @@ impl RedisSinkWriter { } #[cfg(test)] - pub fn mock( + pub async fn mock( schema: Schema, pk_indices: Vec, - is_append_only: bool, - key_format: Option, - value_format: Option, + format_desc: &SinkFormatDesc, ) -> Result { - let formatter = SinkFormatterImpl::new_with_redis( + let formatter = SinkFormatterImpl::new( + format_desc, schema.clone(), pk_indices.clone(), - is_append_only, - key_format, - value_format, - )?; + "d1".to_string(), + "t1".to_string(), + ) + .await?; Ok(Self { schema, pk_indices, - is_append_only, epoch: 0, formatter, payload_writer: RedisSinkPayloadWriter::mock(), @@ -290,6 +280,8 @@ impl SinkWriter for RedisSinkWriter { #[cfg(test)] mod test { + use std::collections::BTreeMap; + use rdkafka::message::FromBytes; use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array}; use risingwave_common::catalog::{Field, Schema}; @@ -297,6 +289,7 @@ mod test { use risingwave_common::util::iter_util::ZipEqDebug; use super::*; + use crate::sink::catalog::{SinkEncode, SinkFormat}; #[tokio::test] async fn test_write() { @@ -315,8 +308,15 @@ mod test { }, ]); - let mut redis_sink_writer = - RedisSinkWriter::mock(schema, vec![0], true, None, None).unwrap(); + let format_desc = SinkFormatDesc { + format: SinkFormat::AppendOnly, + encode: SinkEncode::Json, + options: BTreeMap::default(), + }; + + let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) + .await + .unwrap(); let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert], @@ -367,14 +367,21 @@ mod test { }, ]); - let mut redis_sink_writer = RedisSinkWriter::mock( - schema, - vec![0], - true, - Some("key-{id}".to_string()), - Some("values:{id:{id},name:{name}}".to_string()), - ) - .unwrap(); + let mut btree_map = BTreeMap::default(); + btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); + btree_map.insert( + VALUE_FORMAT.to_string(), + "values:{id:{id},name:{name}}".to_string(), + ); + let format_desc = SinkFormatDesc { + format: SinkFormat::AppendOnly, + encode: SinkEncode::Template, + options: btree_map, + }; + + let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) + .await + .unwrap(); let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert], diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index ad182e734a33a..310213262b2ad 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -13,17 +13,23 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Formatter; +use std::future::Future; use std::marker::PhantomData; use std::ops::Deref; +use std::time::Instant; use anyhow::anyhow; use async_trait::async_trait; +use futures::stream::Peekable; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use jni::objects::{JByteArray, JValue, JValueOwned}; use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; +use risingwave_common::util::await_future_with_monitor_error_stream; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_pb::connector_service::sink_coordinator_stream_request::{ CommitMetadata, StartCoordinator, @@ -43,15 +49,17 @@ use risingwave_pb::connector_service::{ }; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio_stream::wrappers::ReceiverStream; use tracing::warn; use super::encoder::{JsonEncoder, RowEncoder}; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::encoder::TimestampHandlingMode; +use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkMetrics, - SinkParam, SinkWriterParam, + DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, + SinkMetrics, SinkParam, SinkWriterParam, }; use crate::ConnectorParams; @@ -101,18 +109,12 @@ impl TryFrom for RemoteSink { impl Sink for RemoteSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf>; + type LogSinker = RemoteLogSinker; const SINK_NAME: &'static str = R::SINK_NAME; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(RemoteSinkWriter::new( - self.param.clone(), - writer_param.connector_params, - writer_param.sink_metrics.clone(), - ) - .await? - .into_log_sinker(writer_param.sink_metrics)) + RemoteLogSinker::new(self.param.clone(), writer_param).await } async fn validate(&self) -> Result<()> { @@ -192,6 +194,139 @@ impl Sink for RemoteSink { } } +pub struct RemoteLogSinker { + writer: RemoteSinkWriter, + sink_metrics: SinkMetrics, +} + +impl RemoteLogSinker { + async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { + let writer = RemoteSinkWriter::new( + sink_param, + writer_param.connector_params, + writer_param.sink_metrics.clone(), + ) + .await?; + let sink_metrics = writer_param.sink_metrics; + Ok(RemoteLogSinker { + writer, + sink_metrics, + }) + } +} + +/// Await the given future while monitoring on error of the receiver stream. +async fn await_future_with_monitor_receiver_err>>( + receiver: &mut SinkWriterStreamJniReceiver, + future: F, +) -> Result { + match await_future_with_monitor_error_stream(&mut receiver.response_stream, future).await { + Ok(result) => result, + Err(None) => Err(SinkError::Remote(anyhow!("end of remote receiver stream"))), + Err(Some(err)) => Err(SinkError::Internal(err)), + } +} + +impl LogSinker for RemoteLogSinker { + async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { + // Note: this is a total copy of the implementation of LogSinkerOf, + // except that we monitor the future of `log_reader.next_item` with await_future_with_monitor_receiver_err + // to monitor the error in the response stream. + + let mut sink_writer = self.writer; + let sink_metrics = self.sink_metrics; + #[derive(Debug)] + enum LogConsumerState { + /// Mark that the log consumer is not initialized yet + Uninitialized, + + /// Mark that a new epoch has begun. + EpochBegun { curr_epoch: u64 }, + + /// Mark that the consumer has just received a barrier + BarrierReceived { prev_epoch: u64 }, + } + + let mut state = LogConsumerState::Uninitialized; + + log_reader.init().await?; + + loop { + let (epoch, item): (u64, LogStoreReadItem) = await_future_with_monitor_receiver_err( + &mut sink_writer.stream_handle.response_rx, + log_reader.next_item().map_err(SinkError::Internal), + ) + .await?; + if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { + match &state { + LogConsumerState::BarrierReceived { .. } => {} + _ => unreachable!( + "update vnode bitmap can be accepted only right after \ + barrier, but current state is {:?}", + state + ), + } + } + // begin_epoch when not previously began + state = match state { + LogConsumerState::Uninitialized => { + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::EpochBegun { curr_epoch } => { + assert!( + epoch >= curr_epoch, + "new epoch {} should not be below the current epoch {}", + epoch, + curr_epoch + ); + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::BarrierReceived { prev_epoch } => { + assert!( + epoch > prev_epoch, + "new epoch {} should be greater than prev epoch {}", + epoch, + prev_epoch + ); + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + }; + match item { + LogStoreReadItem::StreamChunk { chunk, .. } => { + if let Err(e) = sink_writer.write_batch(chunk).await { + sink_writer.abort().await?; + return Err(e); + } + } + LogStoreReadItem::Barrier { is_checkpoint } => { + let prev_epoch = match state { + LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, + _ => unreachable!("epoch must have begun before handling barrier"), + }; + if is_checkpoint { + let start_time = Instant::now(); + sink_writer.barrier(true).await?; + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } else { + sink_writer.barrier(false).await?; + } + state = LogConsumerState::BarrierReceived { prev_epoch } + } + LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { + sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + } + } + } +} + #[derive(Debug)] pub struct CoordinatedRemoteSink(pub RemoteSink); @@ -286,14 +421,11 @@ impl SinkCoordinatorStreamJniHandle { } } -const DEFAULT_CHANNEL_SIZE: usize = 16; -#[derive(Debug)] -pub struct SinkWriterStreamJniHandle { +struct SinkWriterStreamJniSender { request_tx: Sender, - response_rx: Receiver, } -impl SinkWriterStreamJniHandle { +impl SinkWriterStreamJniSender { pub async fn start_epoch(&mut self, epoch: u64) -> Result<()> { self.request_tx .send(SinkWriterStreamRequest { @@ -316,33 +448,29 @@ impl SinkWriterStreamJniHandle { .map_err(|err| SinkError::Internal(err.into())) } - pub async fn barrier(&mut self, epoch: u64) -> Result<()> { + pub async fn barrier(&mut self, epoch: u64, is_checkpoint: bool) -> Result<()> { self.request_tx .send(SinkWriterStreamRequest { request: Some(SinkRequest::Barrier(Barrier { epoch, - is_checkpoint: false, + is_checkpoint, })), }) .await .map_err(|err| SinkError::Internal(err.into())) } +} - pub async fn commit(&mut self, epoch: u64) -> Result { - self.request_tx - .send(SinkWriterStreamRequest { - request: Some(SinkRequest::Barrier(Barrier { - epoch, - is_checkpoint: true, - })), - }) - .await - .map_err(|err| SinkError::Internal(err.into()))?; +struct SinkWriterStreamJniReceiver { + response_stream: Peekable>>, +} - match self.response_rx.recv().await { - Some(SinkWriterStreamResponse { +impl SinkWriterStreamJniReceiver { + async fn next_commit_response(&mut self) -> Result { + match self.response_stream.try_next().await { + Ok(Some(SinkWriterStreamResponse { response: Some(sink_writer_stream_response::Response::Commit(rsp)), - }) => Ok(rsp), + })) => Ok(rsp), msg => Err(SinkError::Internal(anyhow!( "should get Sync response but get {:?}", msg @@ -351,6 +479,53 @@ impl SinkWriterStreamJniHandle { } } +const DEFAULT_CHANNEL_SIZE: usize = 16; +struct SinkWriterStreamJniHandle { + request_tx: SinkWriterStreamJniSender, + response_rx: SinkWriterStreamJniReceiver, +} + +impl std::fmt::Debug for SinkWriterStreamJniHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SinkWriterStreamJniHandle").finish() + } +} + +impl SinkWriterStreamJniHandle { + async fn start_epoch(&mut self, epoch: u64) -> Result<()> { + await_future_with_monitor_receiver_err( + &mut self.response_rx, + self.request_tx.start_epoch(epoch), + ) + .await + } + + async fn write_batch(&mut self, epoch: u64, batch_id: u64, payload: Payload) -> Result<()> { + await_future_with_monitor_receiver_err( + &mut self.response_rx, + self.request_tx.write_batch(epoch, batch_id, payload), + ) + .await + } + + async fn barrier(&mut self, epoch: u64) -> Result<()> { + await_future_with_monitor_receiver_err( + &mut self.response_rx, + self.request_tx.barrier(epoch, false), + ) + .await + } + + async fn commit(&mut self, epoch: u64) -> Result { + await_future_with_monitor_receiver_err( + &mut self.response_rx, + self.request_tx.barrier(epoch, true), + ) + .await?; + self.response_rx.next_commit_response().await + } +} + pub type RemoteSinkWriter = RemoteSinkWriterInner<(), R>; pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner, R>; @@ -374,10 +549,7 @@ impl RemoteSinkWriterInner { let (request_tx, request_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let (response_tx, response_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - let mut stream_handle = SinkWriterStreamJniHandle { - request_tx, - response_rx, - }; + let mut response_stream = ReceiverStream::new(response_rx).peekable(); std::thread::spawn(move || { let mut env = JVM.get_or_init().unwrap().attach_current_thread().unwrap(); @@ -388,7 +560,10 @@ impl RemoteSinkWriterInner { "(JJ)V", &[ JValue::from(&request_rx as *const Receiver as i64), - JValue::from(&response_tx as *const Sender as i64), + JValue::from( + &response_tx as *const Sender> + as i64, + ), ], ); @@ -410,8 +585,7 @@ impl RemoteSinkWriterInner { }; // First request - stream_handle - .request_tx + request_tx .send(sink_writer_stream_request) .await .map_err(|err| { @@ -423,17 +597,18 @@ impl RemoteSinkWriterInner { })?; // First response - match stream_handle.response_rx.recv().await { - Some(SinkWriterStreamResponse { + match response_stream.try_next().await { + Ok(Some(SinkWriterStreamResponse { response: Some(sink_writer_stream_response::Response::Start(_)), - }) => {} - msg => { + })) => {} + Ok(msg) => { return Err(SinkError::Internal(anyhow!( "should get start response for connector `{}` but get {:?}", R::SINK_NAME, msg ))); } + Err(e) => return Err(SinkError::Internal(e)), }; tracing::trace!( @@ -444,6 +619,11 @@ impl RemoteSinkWriterInner { let schema = param.schema(); + let stream_handle = SinkWriterStreamJniHandle { + request_tx: SinkWriterStreamJniSender { request_tx }, + response_rx: SinkWriterStreamJniReceiver { response_stream }, + }; + Ok(Self { properties: param.properties, epoch: None, @@ -458,7 +638,7 @@ impl RemoteSinkWriterInner { #[cfg(test)] fn for_test( - response_receiver: Receiver, + response_receiver: Receiver>, request_sender: Sender, ) -> RemoteSinkWriter { use risingwave_common::catalog::{Field, Schema}; @@ -480,8 +660,12 @@ impl RemoteSinkWriterInner { ]); let stream_handle = SinkWriterStreamJniHandle { - request_tx: request_sender, - response_rx: response_receiver, + request_tx: SinkWriterStreamJniSender { + request_tx: request_sender, + }, + response_rx: SinkWriterStreamJniReceiver { + response_stream: ReceiverStream::new(response_receiver).peekable(), + }, }; RemoteSinkWriter { @@ -828,12 +1012,12 @@ mod test { // test commit response_sender - .send(SinkWriterStreamResponse { + .send(Ok(SinkWriterStreamResponse { response: Some(Response::Commit(CommitResponse { epoch: 2022, metadata: None, })), - }) + })) .await .expect("test failed: failed to sync epoch"); sink.barrier(true).await.unwrap(); diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index c2f46d5632274..b49c4ae161dfc 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -17,7 +17,7 @@ #![feature(lint_reasons)] #![feature(iterator_try_collect)] #![feature(lazy_cell)] -#![feature(generators)] +#![feature(coroutines)] #![feature(arc_unwrap_or_clone)] #![feature(never_type)] diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index a5906e4320282..6ea82d30ac5f1 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -28,7 +28,7 @@ #![feature(exclusive_range_pattern)] #![feature(lazy_cell)] #![feature(round_ties_even)] -#![feature(generators)] +#![feature(coroutines)] #![feature(test)] #![feature(arc_unwrap_or_clone)] diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 974730cd16237..f1038f9bf5943 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -363,6 +363,13 @@ impl Binder { } } +/// The column name stored in [`BindContext`] for a column without an alias. +pub const UNNAMED_COLUMN: &str = "?column?"; +/// The table name stored in [`BindContext`] for a subquery without an alias. +const UNNAMED_SUBQUERY: &str = "?subquery?"; +/// The table name stored in [`BindContext`] for a column group. +const COLUMN_GROUP_PREFIX: &str = "?column_group_id?"; + #[cfg(test)] pub mod test_utils { use risingwave_common::types::DataType; @@ -380,10 +387,3 @@ pub mod test_utils { Binder::new_with_param_types(&SessionImpl::mock(), param_types) } } - -/// The column name stored in [`BindContext`] for a column without an alias. -pub const UNNAMED_COLUMN: &str = "?column?"; -/// The table name stored in [`BindContext`] for a subquery without an alias. -const UNNAMED_SUBQUERY: &str = "?subquery?"; -/// The table name stored in [`BindContext`] for a column group. -const COLUMN_GROUP_PREFIX: &str = "?column_group_id?"; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 32279dd4e70eb..ddb1d697b856d 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -244,6 +244,7 @@ fn bind_sink_format_desc(value: SinkSchema) -> Result { E::Json => SinkEncode::Json, E::Protobuf => SinkEncode::Protobuf, E::Avro => SinkEncode::Avro, + E::Template => SinkEncode::Template, e @ (E::Native | E::Csv | E::Bytes) => { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()) } @@ -262,6 +263,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], Format::Debezium => vec![Encode::Json], ), + RedisSink::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Json,Encode::Template], + Format::Upsert => vec![Encode::Json,Encode::Template], + ), )) }); pub fn validate_compatibility(connector: &str, format_desc: &SinkSchema) -> Result<()> { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 149f39bead330..174ed23e03ec5 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -73,6 +73,7 @@ mod show; mod transaction; pub mod util; pub mod variable; +mod wait; /// The [`PgResponseBuilder`] used by RisingWave. pub type RwPgResponseBuilder = PgResponseBuilder; @@ -419,6 +420,7 @@ pub async fn handle( } } Statement::Flush => flush::handle_flush(handler_args).await, + Statement::Wait => wait::handle_wait(handler_args).await, Statement::SetVariable { local: _, variable, diff --git a/src/frontend/src/handler/wait.rs b/src/frontend/src/handler/wait.rs new file mode 100644 index 0000000000000..83f2784ec8c17 --- /dev/null +++ b/src/frontend/src/handler/wait.rs @@ -0,0 +1,31 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::error::Result; + +use super::RwPgResponse; +use crate::handler::HandlerArgs; +use crate::session::SessionImpl; + +pub(super) async fn handle_wait(handler_args: HandlerArgs) -> Result { + do_wait(&handler_args.session).await?; + Ok(PgResponse::empty_result(StatementType::WAIT)) +} + +pub(crate) async fn do_wait(session: &SessionImpl) -> Result<()> { + let client = session.env().meta_client(); + client.wait().await?; + Ok(()) +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 0a036b8e96233..450f49b6394cf 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -15,7 +15,7 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(map_try_insert)] #![feature(negative_impls)] -#![feature(generators)] +#![feature(coroutines)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(trait_alias)] #![feature(extract_if)] @@ -32,7 +32,6 @@ #![feature(extend_one)] #![feature(type_alias_impl_trait)] #![feature(impl_trait_in_assoc_type)] -#![feature(async_fn_in_trait)] #![feature(result_flattening)] #![recursion_limit = "256"] diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ae90c2e345f9f..d37c5dec127f1 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -43,6 +43,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn flush(&self, checkpoint: bool) -> Result; + async fn wait(&self) -> Result<()>; + async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result>; async fn list_table_fragments( @@ -111,6 +113,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.flush(checkpoint).await } + async fn wait(&self) -> Result<()> { + self.0.wait().await + } + async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result> { self.0.cancel_creating_jobs(infos).await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 20eb252fc5053..cf915ae35713d 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -773,6 +773,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { }) } + async fn wait(&self) -> RpcResult<()> { + Ok(()) + } + async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult> { Ok(vec![]) } diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index 69c11a7f21e24..77cafd155000d 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -10,6 +10,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +anyhow = "1" bytes = "1" cfg-or-panic = "0.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 29bbf76929b45..4815cd7368370 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -902,14 +902,17 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterRe 'a, >( env: EnvParam<'a>, - channel: Pointer<'a, Sender>, + channel: Pointer<'a, Sender>>, msg: JByteArray<'a>, ) -> jboolean { execute_and_catch(env, move |env| { let sink_writer_stream_response: SinkWriterStreamResponse = Message::decode(to_guarded_slice(&msg, env)?.deref())?; - match channel.as_ref().blocking_send(sink_writer_stream_response) { + match channel + .as_ref() + .blocking_send(Ok(sink_writer_stream_response)) + { Ok(_) => Ok(JNI_TRUE), Err(e) => { tracing::info!("send error. {:?}", e); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 55c7b27b0c80a..bf1bddad2070f 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -14,7 +14,7 @@ #![feature(lint_reasons)] #![feature(let_chains)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] mod server; use std::time::Duration; diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 935d398aeacb0..061ff93589163 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -717,7 +717,7 @@ impl DdlService for DdlServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn get_tables( &self, request: Request, @@ -732,6 +732,11 @@ impl DdlService for DdlServiceImpl { } Ok(Response::new(GetTablesResponse { tables })) } + + async fn wait(&self, _request: Request) -> Result, Status> { + self.ddl_controller.wait().await; + Ok(Response::new(WaitResponse {})) + } } impl DdlServiceImpl { diff --git a/src/meta/service/src/heartbeat_service.rs b/src/meta/service/src/heartbeat_service.rs index 7c51b39346894..e31058ff2bdc5 100644 --- a/src/meta/service/src/heartbeat_service.rs +++ b/src/meta/service/src/heartbeat_service.rs @@ -32,7 +32,7 @@ impl HeartbeatServiceImpl { #[async_trait::async_trait] impl HeartbeatService for HeartbeatServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn heartbeat( &self, request: Request, diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 0d473a6ed031f..6c8cc11f8971c 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -16,7 +16,7 @@ #![feature(let_chains)] #![feature(lazy_cell)] #![feature(impl_trait_in_assoc_type)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] use risingwave_meta::*; diff --git a/src/meta/service/src/meta_member_service.rs b/src/meta/service/src/meta_member_service.rs index 25c4c7ad4cc84..5753061176e8c 100644 --- a/src/meta/service/src/meta_member_service.rs +++ b/src/meta/service/src/meta_member_service.rs @@ -36,7 +36,7 @@ impl MetaMemberServiceImpl { #[async_trait::async_trait] impl MetaMemberService for MetaMemberServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn members( &self, _request: Request, diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index bd247c1e18980..0fcc470a70e39 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -207,7 +207,7 @@ impl NotificationServiceImpl { impl NotificationService for NotificationServiceImpl { type SubscribeStream = UnboundedReceiverStream; - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn subscribe( &self, request: Request, diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index f231ea5f4955d..676180adc7581 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -59,7 +59,7 @@ impl ScaleServiceImpl { #[async_trait::async_trait] impl ScaleService for ScaleServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn get_cluster_info( &self, _: Request, @@ -110,7 +110,7 @@ impl ScaleService for ScaleServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn reschedule( &self, request: Request, @@ -174,7 +174,7 @@ impl ScaleService for ScaleServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn get_reschedule_plan( &self, request: Request, diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index ef232d9b04ffd..92af1d4beb707 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -59,7 +59,7 @@ impl StreamServiceImpl { #[async_trait::async_trait] impl StreamManagerService for StreamServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn flush(&self, request: Request) -> TonicResponse { self.env.idle_manager().record_activity(); let req = request.into_inner(); @@ -71,7 +71,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn pause(&self, _: Request) -> Result, Status> { let i = self .barrier_scheduler @@ -83,7 +83,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn resume(&self, _: Request) -> Result, Status> { let i = self .barrier_scheduler @@ -122,7 +122,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn list_table_fragments( &self, request: Request, @@ -165,7 +165,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn list_table_fragment_states( &self, _request: Request, @@ -186,7 +186,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn list_fragment_distribution( &self, _request: Request, @@ -215,7 +215,7 @@ impl StreamManagerService for StreamServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn list_actor_states( &self, _request: Request, diff --git a/src/meta/service/src/user_service.rs b/src/meta/service/src/user_service.rs index 8c982521b112a..cb290766e6fd1 100644 --- a/src/meta/service/src/user_service.rs +++ b/src/meta/service/src/user_service.rs @@ -107,7 +107,7 @@ impl UserServiceImpl { #[async_trait::async_trait] impl UserService for UserServiceImpl { - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn create_user( &self, request: Request, @@ -128,7 +128,7 @@ impl UserService for UserServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn drop_user( &self, request: Request, @@ -142,7 +142,7 @@ impl UserService for UserServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn update_user( &self, request: Request, @@ -165,7 +165,7 @@ impl UserService for UserServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn grant_privilege( &self, request: Request, @@ -185,7 +185,7 @@ impl UserService for UserServiceImpl { })) } - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] async fn revoke_privilege( &self, request: Request, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 2b0c3e3db87dc..1b3a284e9ccc9 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1761,7 +1761,7 @@ impl HummockManager { } /// Get version deltas from meta store - #[cfg_attr(coverage, no_coverage)] + #[cfg_attr(coverage, coverage(off))] #[named] pub async fn list_version_deltas( &self, diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index afe66d27ad8e8..f549578f079c6 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -26,13 +26,12 @@ #![feature(error_generic_member_access)] #![feature(assert_matches)] #![feature(try_blocks)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] #![feature(custom_test_frameworks)] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(is_sorted)] #![feature(impl_trait_in_assoc_type)] #![feature(type_name_of_val)] -#![feature(async_fn_in_trait)] pub mod backup_restore; pub mod barrier; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 04b9729c5a5b8..36615bd93b757 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; +use std::time::Duration; use itertools::Itertools; use risingwave_common::config::DefaultParallelism; @@ -29,6 +30,7 @@ use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; +use tokio::time::sleep; use tracing::log::warn; use tracing::Instrument; @@ -1094,4 +1096,18 @@ impl DdlController { } } } + + pub async fn wait(&self) { + for _ in 0..30 * 60 { + if self + .catalog_manager + .list_creating_background_mvs() + .await + .is_empty() + { + break; + } + sleep(Duration::from_secs(1)).await; + } + } } diff --git a/src/prost/helpers/src/lib.rs b/src/prost/helpers/src/lib.rs index f4d1d1a45baa1..5796e14273fe9 100644 --- a/src/prost/helpers/src/lib.rs +++ b/src/prost/helpers/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] #![feature(iterator_try_collect)] use proc_macro::TokenStream; @@ -24,7 +24,7 @@ mod generate; /// This attribute will be placed before any pb types, including messages and enums. /// See `prost/helpers/README.md` for more details. -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] #[proc_macro_derive(AnyPB)] pub fn any_pb(input: TokenStream) -> TokenStream { // Parse the string representation @@ -37,7 +37,7 @@ pub fn any_pb(input: TokenStream) -> TokenStream { } // Procedure macros can not be tested from the same crate. -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] fn produce(ast: &DeriveInput) -> Result { let name = &ast.ident; diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 3e744bb61608d..6afa67ef88efe 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -19,7 +19,7 @@ #![feature(result_option_inspect)] #![feature(type_alias_impl_trait)] #![feature(associated_type_defaults)] -#![feature(generators)] +#![feature(coroutines)] #![feature(iterator_try_collect)] #![feature(hash_extract_if)] #![feature(try_blocks)] @@ -30,12 +30,11 @@ use std::any::type_name; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::iter::repeat; -use std::pin::pin; use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use futures::future::{select, try_join_all, Either}; +use futures::future::try_join_all; use futures::stream::{BoxStream, Peekable}; use futures::{Stream, StreamExt}; use moka::future::Cache; @@ -58,13 +57,12 @@ mod sink_coordinate_client; mod stream_client; mod tracing; -use std::pin::Pin; - pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; +use risingwave_common::util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; @@ -240,25 +238,16 @@ impl BidiStreamHandle { } pub async fn send_request(&mut self, request: REQ) -> Result<()> { - // Poll the response stream to early see the error - let send_request_result = match select( - pin!(self.request_sender.send(request)), - pin!(Pin::new(&mut self.response_stream).peek()), + match await_future_with_monitor_error_stream( + &mut self.response_stream, + self.request_sender.send(request), ) .await { - Either::Left((result, _)) => result, - Either::Right((response_result, send_future)) => match response_result { - None => { - return Err(anyhow!("end of response stream").into()); - } - Some(Err(e)) => { - return Err(e.clone().into()); - } - Some(Ok(_)) => send_future.await, - }, - }; - send_request_result - .map_err(|_| anyhow!("unable to send request {}", type_name::()).into()) + Ok(send_result) => send_result + .map_err(|_| anyhow!("unable to send request {}", type_name::()).into()), + Err(None) => Err(anyhow!("end of response stream").into()), + Err(Some(e)) => Err(e.into()), + } } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 827860d1af7b3..95b746ea33e6c 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -698,6 +698,12 @@ impl MetaClient { Ok(resp.snapshot.unwrap()) } + pub async fn wait(&self) -> Result<()> { + let request = WaitRequest {}; + self.inner.wait(request).await?; + Ok(()) + } + pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result> { let request = CancelCreatingJobsRequest { jobs: Some(jobs) }; let resp = self.inner.cancel_creating_jobs(request).await?; @@ -1719,6 +1725,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse } ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse } ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse } + ,{ ddl_client, wait, WaitRequest, WaitResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 1a32888cdf651..aaa045c607c95 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -16,7 +16,7 @@ #![feature(trait_alias)] #![feature(lint_reasons)] #![feature(result_option_inspect)] -#![feature(generators)] +#![feature(coroutines)] #![feature(hash_extract_if)] #![feature(type_alias_impl_trait)] #![feature(box_patterns)] diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index ecae5a9663a88..5d802bae99cdc 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1294,6 +1294,9 @@ pub enum Statement { /// /// Note: RisingWave specific statement. Flush, + /// WAIT for ALL running stream jobs to finish. + /// It will block the current session the condition is met. + Wait, } impl fmt::Display for Statement { @@ -1787,6 +1790,9 @@ impl fmt::Display for Statement { Statement::Flush => { write!(f, "FLUSH") } + Statement::Wait => { + write!(f, "WAIT") + } Statement::Begin { modes } => { write!(f, "BEGIN")?; if !modes.is_empty() { diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 76de970a919a9..58fb2d50c6287 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -294,6 +294,7 @@ pub enum Encode { Json, // Keyword::JSON Bytes, // Keyword::BYTES Native, + Template, } // TODO: unify with `from_keyword` @@ -309,6 +310,7 @@ impl fmt::Display for Encode { Encode::Json => "JSON", Encode::Bytes => "BYTES", Encode::Native => "NATIVE", + Encode::Template => "TEMPLATE", } ) } @@ -322,13 +324,12 @@ impl Encode { "CSV" => Encode::Csv, "PROTOBUF" => Encode::Protobuf, "JSON" => Encode::Json, + "TEMPLATE" => Encode::Template, "NATIVE" => Encode::Native, // used internally for schema change - _ => { - return Err(ParserError::ParserError( - "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE after Encode" - .to_string(), - )) - } + _ => return Err(ParserError::ParserError( + "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE after Encode" + .to_string(), + )), }) } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 5c2fedb0ea547..4188f06f76ae3 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -540,6 +540,7 @@ define_keywords!( VIEWS, VIRTUAL, VOLATILE, + WAIT, WATERMARK, WHEN, WHENEVER, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index ee054f7d17031..5cc094a204268 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -259,6 +259,7 @@ impl Parser { Keyword::PREPARE => Ok(self.parse_prepare()?), Keyword::COMMENT => Ok(self.parse_comment()?), Keyword::FLUSH => Ok(Statement::Flush), + Keyword::WAIT => Ok(Statement::Wait), _ => self.expected( "an SQL statement", Token::Word(w).with_location(token.location), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index c6fc5531acd33..fc01eba294564 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -25,7 +25,7 @@ dyn-clone = "1.0.14" either = "1" enum-as-inner = "0.6" fail = "0.5" -foyer = { git = "https://github.com/mrcroxx/foyer", rev = "5d0134b" } +foyer = { git = "https://github.com/MrCroxx/foyer", rev = "2261151" } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hex = "0.4" diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 3e0549db188a2..1daacbf691c0d 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -25,7 +25,7 @@ #![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] pub mod error; pub mod meta_snapshot; diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index 600a5249ddf1b..8abf2f45e6855 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -20,7 +20,7 @@ bytes = { version = "1" } clap = { version = "4", features = ["derive"] } fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } -futures-async-stream = "0.2" +futures-async-stream = "0.2.9" itertools = "0.11" parking_lot = "0.12" rand = "0.8" @@ -47,7 +47,7 @@ futures = { version = "0.3", default-features = false, features = [ "executor", ] } -futures-async-stream = "0.2" +futures-async-stream = "0.2.9" risingwave_test_runner = { workspace = true } serial_test = "2.0" sync-point = { path = "../../utils/sync-point" } diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 1e9c9591bc864..ae6038d8b5d16 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -13,7 +13,7 @@ // limitations under the License. #![feature(bound_map)] -#![feature(generators)] +#![feature(coroutines)] #![feature(stmt_expr_attributes)] #![feature(proc_macro_hygiene)] diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index 73e1d8cd0eaad..593771435f1e0 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -17,7 +17,6 @@ #![feature(bound_map)] #![feature(type_alias_impl_trait)] #![feature(associated_type_bounds)] -#![feature(return_position_impl_trait_in_trait)] #[cfg(test)] mod compactor_tests; diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 46eabf17835e4..150b35b79cda0 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -14,7 +14,7 @@ bincode = { version = "=2.0.0-rc.3", features = ["serde"] } byteorder = "1" bytes = { version = "1", features = ["serde"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } -futures-async-stream = "0.2" +futures-async-stream = "0.2.9" parking_lot = "0.12" prost = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/storage/hummock_trace/src/lib.rs b/src/storage/hummock_trace/src/lib.rs index df757c58cc4fa..8c6c8913205ab 100644 --- a/src/storage/hummock_trace/src/lib.rs +++ b/src/storage/hummock_trace/src/lib.rs @@ -16,7 +16,7 @@ #![feature(cursor_remaining)] #![feature(bound_map)] #![feature(trait_alias)] -#![feature(generators)] +#![feature(coroutines)] mod collector; mod error; diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 583bab3d10b3c..a21016014d247 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -140,9 +140,6 @@ impl CompactorRunner { Ok((self.split_index, ssts, compaction_stat)) } - // This is a clippy bug, see https://github.com/rust-lang/rust-clippy/issues/11380. - // TODO: remove `allow` here after the issued is closed. - #[expect(clippy::needless_pass_by_ref_mut)] pub async fn build_delete_range_iter( sstable_infos: &Vec, sstable_store: &SstableStoreRef, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index d2f36167675e7..137682d6f7825 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -325,7 +325,7 @@ impl Compactor { /// The background compaction thread that receives compaction tasks from hummock compaction /// manager and runs compaction tasks. -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] pub fn start_compactor( compactor_context: CompactorContext, hummock_meta_client: Arc, @@ -618,7 +618,7 @@ pub fn start_compactor( /// The background compaction thread that receives compaction tasks from hummock compaction /// manager and runs compaction tasks. -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] pub fn start_shared_compactor( grpc_proxy_client: GrpcCompactorProxyClient, mut receiver: mpsc::UnboundedReceiver>, diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 995a9d181e2f5..a07da55fb7046 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1641,9 +1641,6 @@ mod tests { (buffer_tracker, uploader, new_task_notifier) } - // This is a clippy bug, see https://github.com/rust-lang/rust-clippy/issues/11380. - // TODO: remove `allow` here after the issued is closed. - #[expect(clippy::needless_pass_by_ref_mut)] async fn assert_uploader_pending(uploader: &mut HummockUploader) { for _ in 0..10 { yield_now().await; diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index c5ffe656ab893..0f2f155f6a903 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -18,7 +18,7 @@ #![feature(bound_map)] #![feature(custom_test_frameworks)] #![feature(extract_if)] -#![feature(generators)] +#![feature(coroutines)] #![feature(hash_extract_if)] #![feature(lint_reasons)] #![feature(proc_macro_hygiene)] @@ -35,15 +35,13 @@ #![feature(btree_extract_if)] #![feature(exact_size_is_empty)] #![feature(lazy_cell)] -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] #![recursion_limit = "256"] #![feature(error_generic_member_access)] #![feature(let_chains)] #![feature(associated_type_bounds)] #![feature(exclusive_range_pattern)] #![feature(impl_trait_in_assoc_type)] -#![feature(async_fn_in_trait)] -#![feature(return_position_impl_trait_in_trait)] pub mod hummock; pub mod memory; diff --git a/src/stream/src/common/table/state_table_cache.rs b/src/stream/src/common/table/state_table_cache.rs index 156637a41a1a4..b458ef52537e4 100644 --- a/src/stream/src/common/table/state_table_cache.rs +++ b/src/stream/src/common/table/state_table_cache.rs @@ -67,9 +67,9 @@ type WatermarkCacheKey = DefaultOrdered; /// Issue delete ranges. /// /// B. Refreshing the cache: -/// On barrier, do table scan from most_recently_cleaned_watermark (inclusive) to +inf. +/// On barrier, do table scan from `most_recently_cleaned_watermark` (inclusive) to +inf. /// Take the Top N rows and insert into cache. -/// This has to be implemented in state_table. +/// This has to be implemented in `state_table`. /// We do not need to store any values, just the keys. /// /// TODO(kwannoel): diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 259b67d5f202b..663f9be94cf5e 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -309,9 +309,6 @@ pub(crate) async fn get_progress_per_vnode( table: &mut StateTableInner, epoch: EpochPair, diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 4178012cb9d9e..75414fe24a379 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -202,11 +202,11 @@ impl std::fmt::Debug for JoinSide { impl JoinSide { // WARNING: Please do not call this until we implement it. - #[expect(dead_code)] fn is_dirty(&self) -> bool { unimplemented!() } + #[expect(dead_code)] fn clear_cache(&mut self) { assert!( !self.is_dirty(), diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 5a68b1b712b26..389dfae7b8c0c 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -21,8 +21,8 @@ #![feature(let_chains)] #![feature(hash_extract_if)] #![feature(extract_if)] -#![feature(generators)] -#![feature(iter_from_generator)] +#![feature(coroutines)] +#![feature(iter_from_coroutine)] #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] #![feature(allocator_api)] @@ -36,13 +36,11 @@ #![feature(bound_map)] #![feature(iter_order_by)] #![feature(exact_size_is_empty)] -#![feature(return_position_impl_trait_in_trait)] #![feature(impl_trait_in_assoc_type)] #![feature(test)] #![feature(is_sorted)] #![feature(btree_cursors)] #![feature(assert_matches)] -#![feature(async_fn_in_trait)] #[macro_use] extern crate tracing; diff --git a/src/stream/tests/integration_tests/hash_agg.rs b/src/stream/tests/integration_tests/hash_agg.rs index 1b61bc5cd1d7f..9f4908f252532 100644 --- a/src/stream/tests/integration_tests/hash_agg.rs +++ b/src/stream/tests/integration_tests/hash_agg.rs @@ -284,7 +284,7 @@ async fn test_hash_agg_emit_on_window_close() { }; check_with_script( - || create_executor(), + create_executor, &format!( r###" - !barrier 1 diff --git a/src/tests/compaction_test/src/bin/compaction.rs b/src/tests/compaction_test/src/bin/compaction.rs index 443b79ad625b8..d9ba16f7437b8 100644 --- a/src/tests/compaction_test/src/bin/compaction.rs +++ b/src/tests/compaction_test/src/bin/compaction.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] fn main() { use clap::Parser; diff --git a/src/tests/compaction_test/src/bin/delete_range.rs b/src/tests/compaction_test/src/bin/delete_range.rs index 348a71dc3cce5..592f61a3db4fa 100644 --- a/src/tests/compaction_test/src/bin/delete_range.rs +++ b/src/tests/compaction_test/src/bin/delete_range.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg_attr(coverage, feature(no_coverage))] +#![cfg_attr(coverage, feature(coverage_attribute))] -#[cfg_attr(coverage, no_coverage)] +#[cfg_attr(coverage, coverage(off))] fn main() { use clap::Parser; diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index 1cda373ee9568..84a17d9907879 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -17,8 +17,6 @@ #![feature(result_option_inspect)] #![feature(iterator_try_collect)] #![feature(trusted_len)] -#![feature(async_fn_in_trait)] -#![feature(return_position_impl_trait_in_trait)] #![feature(lazy_cell)] #![expect(clippy::doc_markdown, reason = "FIXME: later")] diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 29ea77f83b71b..eeec929732f50 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -92,6 +92,7 @@ pub enum StatementType { ROLLBACK, SET_TRANSACTION, CANCEL_COMMAND, + WAIT, } impl std::fmt::Display for StatementType { @@ -278,6 +279,7 @@ impl StatementType { }, Statement::Explain { .. } => Ok(StatementType::EXPLAIN), Statement::Flush => Ok(StatementType::FLUSH), + Statement::Wait => Ok(StatementType::WAIT), _ => Err("unsupported statement type".to_string()), } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 6c08e08490f7d..67b218c787652 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -37,7 +37,7 @@ combine = { version = "4", features = ["tokio"] } crossbeam-epoch = { version = "0.9" } crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } -deranged = { version = "0.3", default-features = false, features = ["serde", "std"] } +deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] } digest = { version = "0.10", features = ["mac", "oid", "std"] } either = { version = "1", features = ["serde"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } @@ -147,7 +147,7 @@ auto_enums = { version = "0.8", features = ["futures03"] } bitflags = { version = "2", default-features = false, features = ["serde", "std"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } -deranged = { version = "0.3", default-features = false, features = ["serde", "std"] } +deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] } either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] }