diff --git a/Cargo.lock b/Cargo.lock index 8b2b20e858436..860d96c1a33cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,9 +1495,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.26" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +checksum = "d87d9d13be47a5b7c3907137f1290b0459a7f80efb26be8c52afb11963bccb02" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1506,7 +1506,7 @@ dependencies = [ "serde", "time 0.1.45", "wasm-bindgen", - "winapi", + "windows-targets 0.48.1", ] [[package]] @@ -2943,7 +2943,7 @@ dependencies = [ "quote", "rand", "regex", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", "syn 2.0.31", "tokio", @@ -4802,9 +4802,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" dependencies = [ "autocfg", "num-integer", @@ -6300,7 +6300,7 @@ checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", ] @@ -6315,9 +6315,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -8846,9 +8846,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "sysinfo" -version = "0.29.9" +version = "0.29.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d0e9cc2273cc8d31377bdd638d72e3ac3e5607b18621062b169d02787f1bab" +checksum = "0a18d114d420ada3a891e6bc8e96a2023402203296a47cdd65083377dad18ba5" dependencies = [ "cfg-if", "core-foundation-sys", @@ -10154,7 +10154,7 @@ dependencies = [ "rand_chacha", "rand_core 0.6.4", "regex", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", "reqwest", "ring", diff --git a/Makefile.toml b/Makefile.toml index 1765307fe691a..42ce20c0769ed 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -579,7 +579,53 @@ description = "Kill RisingWave dev cluster" script = ''' #!/usr/bin/env bash -tmux list-windows -t risedev -F "#{pane_id}" | xargs -I {} tmux send-keys -t {} C-c C-d +set -euo pipefail + +wait_kafka_exit() { + # Follow kafka-server-stop.sh + while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do + echo "Waiting for kafka to exit" + sleep 1 + done +} + +wait_zookeeper_exit() { + # Follow zookeeper-server-stop.sh + while [[ -n "$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')" ]]; do + echo "Waiting for zookeeper to exit" + sleep 1 + done +} + +kill_kafka() { + ${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh + wait_kafka_exit +} + +kill_zookeeper() { + ${PREFIX_BIN}/kafka/bin/zookeeper-server-stop.sh + wait_zookeeper_exit +} + +# Kill other components +tmux list-windows -t risedev -F "#{window_name} #{pane_id}" \ +| grep -v 'kafka' \ +| grep -v 'zookeeper' \ +| awk '{ print $2 }' \ +| xargs -I {} tmux send-keys -t {} C-c C-d + +if [[ -n $(tmux list-windows -t risedev | grep kafka) ]]; +then + echo "kill kafka" + kill_kafka + + echo "kill zookeeper" + kill_zookeeper + + # Kill their tmux sessions + tmux list-windows -t risedev -F "#{pane_id}" | xargs -I {} tmux send-keys -t {} C-c C-d +fi + tmux kill-session -t risedev test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; } ''' @@ -1238,3 +1284,8 @@ cat << EOF > src/config/example.toml EOF cargo run -p risingwave_common --bin example-config >> src/config/example.toml ''' + +[tasks.backwards-compat-test] +category = "RiseDev - Backwards Compatibility Test" +description = "Run backwards compatibility test" +script = "./backwards-compat-tests/scripts/run_local.sh" \ No newline at end of file diff --git a/backwards-compat-tests/README.md b/backwards-compat-tests/README.md new file mode 100644 index 0000000000000..b5036d3ce39f7 --- /dev/null +++ b/backwards-compat-tests/README.md @@ -0,0 +1,18 @@ +# Backwards Compatibility Tests + +The backwards compatibility tests run in the following manner: +1. Prepare old-cluster artifacts +2. Configure the old-cluster. +3. Start the old-cluster. +4. Run DDL / DML / DQL. +5. Stop the old-cluster. +6. Prepare new-cluster artifacts. +7. Configure the new-cluster. +8. Start the new-cluster. +9. Verify results of step 4. + +We currently cover the following: +1. Basic mv +2. Nexmark (on rw table not nexmark source) +3. TPC-H +4. Kafka Source \ No newline at end of file diff --git a/backwards-compat-tests/scripts/run_local.sh b/backwards-compat-tests/scripts/run_local.sh new file mode 100755 index 0000000000000..cfa164ec35a29 --- /dev/null +++ b/backwards-compat-tests/scripts/run_local.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ORIGINAL_BRANCH=$(git branch --show-current) + +on_exit() { + git checkout "$ORIGINAL_BRANCH" +} + +trap on_exit EXIT + +source backwards-compat-tests/scripts/utils.sh + +configure_rw() { +echo "--- Setting up cluster config" +cat < risedev-profiles.user.yml +full-without-monitoring: + steps: + - use: minio + - use: etcd + - use: meta-node + - use: compute-node + - use: frontend + - use: compactor + - use: zookeeper + - use: kafka +EOF + +cat < risedev-components.user.env +RISEDEV_CONFIGURED=false + +ENABLE_MINIO=true +ENABLE_ETCD=true +ENABLE_KAFKA=true + +# Fetch risingwave binary from release. +ENABLE_BUILD_RUST=true + +# Ensure it will link the all-in-one binary from our release. +ENABLE_ALL_IN_ONE=true + +# ENABLE_RELEASE_PROFILE=true +EOF +} + +setup_old_cluster() { + echo "--- Setting up old cluster" + git checkout "v${OLD_VERSION}-rc" +} + +setup_new_cluster() { + echo "--- Setting up new cluster" + rm -r .risingwave/bin/risingwave + git checkout main +} + +main() { + set -euo pipefail + get_rw_versions + setup_old_cluster + configure_rw + seed_old_cluster "$OLD_VERSION" + + setup_new_cluster + configure_rw + validate_new_cluster "$NEW_VERSION" +} + +main \ No newline at end of file diff --git a/backwards-compat-tests/scripts/utils.sh b/backwards-compat-tests/scripts/utils.sh new file mode 100644 index 0000000000000..e047b9f44f421 --- /dev/null +++ b/backwards-compat-tests/scripts/utils.sh @@ -0,0 +1,243 @@ +#!/usr/bin/env bash + +# NOTE(kwannoel): +# Do not run this script directly, it is meant to be sourced. +# Backwards compatibility tests consist of the following parts: +# +# 1. Setup old cluster binaries. +# 2. Seed old cluster. +# 3. Setup new cluster binaries. +# 4. Run validation on new cluster. +# +# Steps 1,3 are specific to the execution environment, CI / Local. +# This script only provides utilities for 2, 4. + +################################### ENVIRONMENT CONFIG + +# Duration to wait for recovery (seconds) +RECOVERY_DURATION=20 + +# Setup test directory +TEST_DIR=.risingwave/backwards-compat-tests/ +KAFKA_PATH=.risingwave/bin/kafka +mkdir -p $TEST_DIR +cp -r backwards-compat-tests/slt/* $TEST_DIR + +wait_kafka_exit() { + # Follow kafka-server-stop.sh + while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do + echo "Waiting for kafka to exit" + sleep 1 + done +} + +wait_zookeeper_exit() { + # Follow zookeeper-server-stop.sh + while [[ -n "$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')" ]]; do + echo "Waiting for zookeeper to exit" + sleep 1 + done +} + +kill_kafka() { + $KAFKA_PATH/bin/kafka-server-stop.sh + wait_kafka_exit +} + +kill_zookeeper() { + $KAFKA_PATH/bin/zookeeper-server-stop.sh + wait_zookeeper_exit +} + +# Older versions of RW may not gracefully kill kafka. +# So we duplicate the definition here. +kill_cluster() { + # Kill other components + tmux list-windows -t risedev -F "#{window_name} #{pane_id}" \ + | grep -v 'kafka' \ + | grep -v 'zookeeper' \ + | awk '{ print $2 }' \ + | xargs -I {} tmux send-keys -t {} C-c C-d + + set +e + if [[ -n $(tmux list-windows -t risedev | grep kafka) ]]; + then + echo "kill kafka" + kill_kafka + + echo "kill zookeeper" + kill_zookeeper + + # Kill their tmux sessions + tmux list-windows -t risedev -F "#{pane_id}" | xargs -I {} tmux send-keys -t {} C-c C-d + fi + set -e + + tmux kill-session -t risedev + test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; } +} + +run_sql () { + psql -h localhost -p 4566 -d dev -U root -c "$@" +} + +check_version() { + local VERSION=$1 + local raw_version=$(run_sql "SELECT version();") + echo "--- Version" + echo "$raw_version" + local version=$(echo $raw_version | grep -i risingwave | sed 's/^.*risingwave-\([0-9]*\.[0-9]*\.[0-9]\).*$/\1/i') + if [[ "$version" != "$VERSION" ]]; then + echo "Version mismatch, expected $VERSION, got $version" + exit 1 + fi +} + +create_kafka_topic() { + "$KAFKA_PATH"/bin/kafka-topics.sh \ + --create \ + --topic backwards_compat_test_kafka_source --bootstrap-server localhost:29092 +} + +insert_json_kafka() { + local JSON=$1 + echo "$JSON" | "$KAFKA_PATH"/bin/kafka-console-producer.sh \ + --topic backwards_compat_test_kafka_source \ + --bootstrap-server localhost:29092 +} + +seed_json_kafka() { + insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 1, "page_id": 1, "action": "gtrgretrg"}' + insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 2, "page_id": 1, "action": "fsdfgerrg"}' + insert_json_kafka '{"timestamp": "2023-07-28 07:11:00", "user_id": 3, "page_id": 1, "action": "sdfergtth"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 4, "page_id": 2, "action": "erwerhghj"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 5, "page_id": 2, "action": "kiku7ikkk"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 6, "page_id": 3, "action": "6786745ge"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 7, "page_id": 3, "action": "fgbgfnyyy"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 8, "page_id": 4, "action": "werwerwwe"}' + insert_json_kafka '{"timestamp": "2023-07-28 06:54:00", "user_id": 9, "page_id": 4, "action": "yjtyjtyyy"}' +} + +################################### Entry Points + +# Get $OLD_VERSION and $NEW_VERSION for Risingwave +get_rw_versions() { + # For backwards compat test we assume we are testing the latest version of RW (i.e. latest main commit) + # against the Nth latest release candidate, where N > 1. N can be larger, + # in case some old cluster did not upgrade. + local VERSION_OFFSET=4 + + # First we obtain a list of versions from git branch names. + # Then we normalize them to semver format (MAJOR.MINOR.PATCH). + echo "--- git branch origin output" + git branch -r | grep origin + + echo "--- VERSION BRANCHES" + local branches=$(git branch -r | grep -E "^ origin\/v[0-9]*\.[0-9]*.*-rc" | tr -d ' ' | sed -E 's/origin\/v([0-9]*\.[0-9])\-rc/\1.0/' | tr -d '\-vrcorigin\/' | tr -d ' ') + echo "$branches" + + # Then we sort them in descending order. + echo "--- VERSIONS" + local sorted_versions=$(echo -e "$branches" | sort -t '.' -n) + echo "$sorted_versions" + + # Then we take the Nth latest version. + # We set $OLD_VERSION to this. + OLD_VERSION=$(echo -e "$sorted_versions" | tail -n $VERSION_OFFSET | head -1) + + # Next, for $NEW_VERSION we just scrape it from `workspace.package.version`. + NEW_VERSION=$(cat Cargo.toml | grep "\[workspace\.package\]" -A 5 | sed -n 's/version = \"\([0-9]*\.[0-9]*\.[0-9]*\).*/\1/p' | tr -d ' ') + + # Then we assert that `$OLD_VERSION` < `$NEW_VERSION`. + local TOP=$(echo -e "$OLD_VERSION\n$NEW_VERSION" | sort -t '.' -n | tail -1) + if [[ "$TOP" != "$NEW_VERSION" ]] + then + echo "ERROR: $OLD_VERSION > $NEW_VERSION" + exit 1 + else + echo "OLD_VERSION: $OLD_VERSION" + echo "NEW_VERSION: $NEW_VERSION" + fi +} + +# Setup table and materialized view. +# Run updates and deletes on the table. +# Get the results. +# TODO: Run nexmark, tpch queries +# TODO(kwannoel): use sqllogictest. +seed_old_cluster() { + # Caller should make sure the test env has these. + # They are called here because the current tests + # may not be backwards compatible, so we need to call + # them in old cluster environment. + cp -r e2e_test/streaming/nexmark $TEST_DIR + cp -r e2e_test/nexmark/* $TEST_DIR/nexmark + + cp -r e2e_test/batch/tpch $TEST_DIR + cp -r e2e_test/tpch/* $TEST_DIR/tpch + + ./risedev clean-data + ./risedev d full-without-monitoring && rm .risingwave/log/* + + check_version "$OLD_VERSION" + + echo "--- BASIC TEST: Seeding old cluster with data" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/basic/seed.slt" + + echo "--- BASIC TEST: Validating old cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/basic/validate_original.slt" + + echo "--- NEXMARK TEST: Seeding old cluster with data" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/nexmark-backwards-compat/seed.slt" + + echo "--- NEXMARK TEST: Validating old cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/nexmark-backwards-compat/validate_original.slt" + + echo "--- TPCH TEST: Seeding old cluster with data" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/tpch-backwards-compat/seed.slt" + + echo "--- TPCH TEST: Validating old cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/tpch-backwards-compat/validate_original.slt" + + echo "--- KAFKA TEST: Seeding old cluster with data" + create_kafka_topic + seed_json_kafka + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/seed.slt" + + echo "--- KAFKA TEST: wait 5s for kafka to process data" + sleep 5 + + echo "--- KAFKA TEST: Validating old cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/validate_original.slt" + + echo "--- Killing cluster" + kill_cluster + echo "--- Killed cluster" +} + +validate_new_cluster() { + echo "--- Start cluster on latest" + ./risedev d full-without-monitoring + + echo "--- Wait ${RECOVERY_DURATION}s for Recovery on Old Cluster Data" + sleep $RECOVERY_DURATION + + check_version "$NEW_VERSION" + + echo "--- BASIC TEST: Validating new cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/basic/validate_restart.slt" + + echo "--- NEXMARK TEST: Validating new cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/nexmark-backwards-compat/validate_restart.slt" + + echo "--- TPCH TEST: Validating new cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/tpch-backwards-compat/validate_restart.slt" + + echo "--- KAFKA TEST: Seeding new cluster with data" + seed_json_kafka + + echo "--- KAFKA TEST: Validating new cluster" + sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/kafka/validate_restart.slt" + + kill_cluster +} \ No newline at end of file diff --git a/backwards-compat-tests/slt/basic/seed.slt b/backwards-compat-tests/slt/basic/seed.slt new file mode 100644 index 0000000000000..57b19a4885e7f --- /dev/null +++ b/backwards-compat-tests/slt/basic/seed.slt @@ -0,0 +1,14 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE TABLE t(v1 int primary key, v2 int); + +statement ok +INSERT INTO t SELECT a AS v1, a * 2 AS v2 FROM generate_series(1, 10000) AS s(a); + +statement ok +CREATE MATERIALIZED VIEW m as SELECT * from t; + +statement ok +UPDATE t SET v2 = v2 + 1 WHERE v1 >= 1 AND v1 <= 5000; \ No newline at end of file diff --git a/backwards-compat-tests/slt/basic/validate_original.slt b/backwards-compat-tests/slt/basic/validate_original.slt new file mode 100644 index 0000000000000..7df2a71b1eb71 --- /dev/null +++ b/backwards-compat-tests/slt/basic/validate_original.slt @@ -0,0 +1,19 @@ +query I +SELECT count(*) FROM t; +---- +10000 + +query I +SELECT sum(v1), sum(v2) FROM t; +---- +50005000 100015000 + +query I +SELECT count(*) FROM m; +---- +10000 + +query I +SELECT sum(v1), sum(v2) FROM m; +---- +50005000 100015000 diff --git a/backwards-compat-tests/slt/basic/validate_restart.slt b/backwards-compat-tests/slt/basic/validate_restart.slt new file mode 100644 index 0000000000000..1a731813855a9 --- /dev/null +++ b/backwards-compat-tests/slt/basic/validate_restart.slt @@ -0,0 +1,41 @@ +# Rerun Original validation +query I +SELECT count(*) FROM t; +---- +10000 + +query I +SELECT sum(v1), sum(v2) FROM t; +---- +50005000 100015000 + +query I +SELECT count(*) FROM m; +---- +10000 + +query I +SELECT sum(v1), sum(v2) FROM m; +---- +50005000 100015000 + +# Test updates and deletes + +statement ok +SET RW_IMPLICIT_FLUSH=true; + +statement ok +UPDATE t SET v2 = v2 + 1 WHERE v1 >= 1 AND v1 <= 5000; + +statement ok +DELETE FROM t WHERE v1 > 5000; + +query I +SELECT COUNT(*) FROM t; +---- +5000 + +query I +SELECT SUM(v2) FROM t; +---- +25015000 \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/seed.slt b/backwards-compat-tests/slt/kafka/seed.slt new file mode 100644 index 0000000000000..3840ce0c96b15 --- /dev/null +++ b/backwards-compat-tests/slt/kafka/seed.slt @@ -0,0 +1,19 @@ +statement ok +CREATE SOURCE IF NOT EXISTS kafka_source +( + action varchar, + user_id integer, + obj_id integer, + name varchar, + page_id integer, + age integer +) +WITH ( + connector='kafka', + topic='backwards_compat_test_kafka_source', + properties.bootstrap.server='localhost:29092', + scan.startup.mode='earliest', +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE MATERIALIZED VIEW kafka_mv1 as SELECT * FROM kafka_source; diff --git a/backwards-compat-tests/slt/kafka/validate_original.slt b/backwards-compat-tests/slt/kafka/validate_original.slt new file mode 100644 index 0000000000000..02fd973c25fff --- /dev/null +++ b/backwards-compat-tests/slt/kafka/validate_original.slt @@ -0,0 +1,12 @@ +query I rowsort +SELECT * FROM kafka_mv1; +---- +6786745ge 6 NULL NULL 3 NULL +erwerhghj 4 NULL NULL 2 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fsdfgerrg 2 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +kiku7ikkk 5 NULL NULL 2 NULL +sdfergtth 3 NULL NULL 1 NULL +werwerwwe 8 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL \ No newline at end of file diff --git a/backwards-compat-tests/slt/kafka/validate_restart.slt b/backwards-compat-tests/slt/kafka/validate_restart.slt new file mode 100644 index 0000000000000..7058b118f4d20 --- /dev/null +++ b/backwards-compat-tests/slt/kafka/validate_restart.slt @@ -0,0 +1,52 @@ +# create a new mv on source, it should retrieve all records +# and match mv1. +statement ok +CREATE MATERIALIZED VIEW kafka_mv2 as SELECT * FROM kafka_source; + +sleep 5s + +query I rowsort +SELECT * FROM kafka_mv2; +---- +6786745ge 6 NULL NULL 3 NULL +6786745ge 6 NULL NULL 3 NULL +erwerhghj 4 NULL NULL 2 NULL +erwerhghj 4 NULL NULL 2 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fsdfgerrg 2 NULL NULL 1 NULL +fsdfgerrg 2 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +kiku7ikkk 5 NULL NULL 2 NULL +kiku7ikkk 5 NULL NULL 2 NULL +sdfergtth 3 NULL NULL 1 NULL +sdfergtth 3 NULL NULL 1 NULL +werwerwwe 8 NULL NULL 4 NULL +werwerwwe 8 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL + +# MV1 should also have new records +query I rowsort +SELECT * FROM kafka_mv1; +---- +6786745ge 6 NULL NULL 3 NULL +6786745ge 6 NULL NULL 3 NULL +erwerhghj 4 NULL NULL 2 NULL +erwerhghj 4 NULL NULL 2 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fgbgfnyyy 7 NULL NULL 3 NULL +fsdfgerrg 2 NULL NULL 1 NULL +fsdfgerrg 2 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +gtrgretrg 1 NULL NULL 1 NULL +kiku7ikkk 5 NULL NULL 2 NULL +kiku7ikkk 5 NULL NULL 2 NULL +sdfergtth 3 NULL NULL 1 NULL +sdfergtth 3 NULL NULL 1 NULL +werwerwwe 8 NULL NULL 4 NULL +werwerwwe 8 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL +yjtyjtyyy 9 NULL NULL 4 NULL + diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt new file mode 100644 index 0000000000000..09fc6387ad65f --- /dev/null +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/delete.slt @@ -0,0 +1,8 @@ +statement ok +DELETE FROM person; + +statement ok +DELETE FROM auction; + +statement ok +DELETE FROM bid; diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/insert.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/insert.slt new file mode 100644 index 0000000000000..27201dcc49a21 --- /dev/null +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/insert.slt @@ -0,0 +1,3 @@ +include ../nexmark/insert_person.slt.part +include ../nexmark/insert_auction.slt.part +include ../nexmark/insert_bid.slt.part diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/seed.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/seed.slt new file mode 100644 index 0000000000000..bb86131e6a71a --- /dev/null +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/seed.slt @@ -0,0 +1,9 @@ +include ../nexmark/create_tables.slt.part + +# First, insert the data into the tables +include ../nexmark/insert_person.slt.part +include ../nexmark/insert_auction.slt.part +include ../nexmark/insert_bid.slt.part + +# Then, create materialized views based on the historical data (snapshot) +include ../nexmark/create_views.slt.part diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/validate_original.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_original.slt new file mode 100644 index 0000000000000..7e4dcdc800ee9 --- /dev/null +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_original.slt @@ -0,0 +1 @@ +include ../nexmark/test_mv_result.slt.part \ No newline at end of file diff --git a/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt new file mode 100644 index 0000000000000..093f88f4fdbbe --- /dev/null +++ b/backwards-compat-tests/slt/nexmark-backwards-compat/validate_restart.slt @@ -0,0 +1,11 @@ +include ../nexmark/test_mv_result.slt.part + +include ./delete.slt.part +include ../nexmark/insert_person.slt.part +include ../nexmark/insert_auction.slt.part +include ../nexmark/insert_bid.slt.part +include ../nexmark/test_mv_result.slt.part + +include ../nexmark/drop_views.slt.part +include ../nexmark/drop_tables.slt.part + diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/delete.slt b/backwards-compat-tests/slt/tpch-backwards-compat/delete.slt new file mode 100644 index 0000000000000..e2a9b5e7b32c2 --- /dev/null +++ b/backwards-compat-tests/slt/tpch-backwards-compat/delete.slt @@ -0,0 +1,23 @@ +statement ok +DELETE FROM supplier; + +statement ok +DELETE FROM part; + +statement ok +DELETE FROM partsupp; + +statement ok +DELETE FROM customer; + +statement ok +DELETE FROM orders; + +statement ok +DELETE FROM lineitem; + +statement ok +DELETE FROM nation; + +statement ok +DELETE FROM region; \ No newline at end of file diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/insert.slt b/backwards-compat-tests/slt/tpch-backwards-compat/insert.slt new file mode 100644 index 0000000000000..c87003aab1e4b --- /dev/null +++ b/backwards-compat-tests/slt/tpch-backwards-compat/insert.slt @@ -0,0 +1,8 @@ +include ../tpch/insert_customer.slt.part +include ../tpch/insert_lineitem.slt.part +include ../tpch/insert_nation.slt.part +include ../tpch/insert_orders.slt.part +include ../tpch/insert_part.slt.part +include ../tpch/insert_partsupp.slt.part +include ../tpch/insert_supplier.slt.part +include ../tpch/insert_region.slt.part diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/seed.slt b/backwards-compat-tests/slt/tpch-backwards-compat/seed.slt new file mode 100644 index 0000000000000..b56cefeab247e --- /dev/null +++ b/backwards-compat-tests/slt/tpch-backwards-compat/seed.slt @@ -0,0 +1,19 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +SET QUERY_MODE TO distributed; + +statement ok +SET CREATE_COMPACTION_GROUP_FOR_MV TO true; + +include ../tpch/create_tables.slt.part + +include ../tpch/insert_customer.slt.part +include ../tpch/insert_lineitem.slt.part +include ../tpch/insert_nation.slt.part +include ../tpch/insert_orders.slt.part +include ../tpch/insert_part.slt.part +include ../tpch/insert_partsupp.slt.part +include ../tpch/insert_supplier.slt.part +include ../tpch/insert_region.slt.part diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/validate_original.slt b/backwards-compat-tests/slt/tpch-backwards-compat/validate_original.slt new file mode 100644 index 0000000000000..801d8574bc267 --- /dev/null +++ b/backwards-compat-tests/slt/tpch-backwards-compat/validate_original.slt @@ -0,0 +1,22 @@ +include ../tpch/q1.slt.part +include ../tpch/q2.slt.part +include ../tpch/q3.slt.part +include ../tpch/q4.slt.part +include ../tpch/q5.slt.part +include ../tpch/q6.slt.part +include ../tpch/q7.slt.part +include ../tpch/q8.slt.part +include ../tpch/q9.slt.part +include ../tpch/q10.slt.part +include ../tpch/q11.slt.part +include ../tpch/q12.slt.part +include ../tpch/q13.slt.part +include ../tpch/q14.slt.part +include ../tpch/q15.slt.part +include ../tpch/q16.slt.part +include ../tpch/q17.slt.part +include ../tpch/q18.slt.part +include ../tpch/q19.slt.part +include ../tpch/q20.slt.part +include ../tpch/q21.slt.part +include ../tpch/q22.slt.part \ No newline at end of file diff --git a/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt b/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt new file mode 100644 index 0000000000000..7c7334cc222d3 --- /dev/null +++ b/backwards-compat-tests/slt/tpch-backwards-compat/validate_restart.slt @@ -0,0 +1,50 @@ +include ../tpch/q1.slt.part +include ../tpch/q2.slt.part +include ../tpch/q3.slt.part +include ../tpch/q4.slt.part +include ../tpch/q5.slt.part +include ../tpch/q6.slt.part +include ../tpch/q7.slt.part +include ../tpch/q8.slt.part +include ../tpch/q9.slt.part +include ../tpch/q10.slt.part +include ../tpch/q11.slt.part +include ../tpch/q12.slt.part +include ../tpch/q13.slt.part +include ../tpch/q14.slt.part +include ../tpch/q15.slt.part +include ../tpch/q16.slt.part +include ../tpch/q17.slt.part +include ../tpch/q18.slt.part +include ../tpch/q19.slt.part +include ../tpch/q20.slt.part +include ../tpch/q21.slt.part +include ../tpch/q22.slt.part + +# Test deletes and updates should work as per normal. +include ./delete.slt.part +include ./insert.slt.part +include ../tpch/q1.slt.part +include ../tpch/q2.slt.part +include ../tpch/q3.slt.part +include ../tpch/q4.slt.part +include ../tpch/q5.slt.part +include ../tpch/q6.slt.part +include ../tpch/q7.slt.part +include ../tpch/q8.slt.part +include ../tpch/q9.slt.part +include ../tpch/q10.slt.part +include ../tpch/q11.slt.part +include ../tpch/q12.slt.part +include ../tpch/q13.slt.part +include ../tpch/q14.slt.part +include ../tpch/q15.slt.part +include ../tpch/q16.slt.part +include ../tpch/q17.slt.part +include ../tpch/q18.slt.part +include ../tpch/q19.slt.part +include ../tpch/q20.slt.part +include ../tpch/q21.slt.part +include ../tpch/q22.slt.part + +include ../tpch/drop_tables.slt.part \ No newline at end of file diff --git a/ci/scripts/backwards-compat-test.sh b/ci/scripts/backwards-compat-test.sh index cbbc87cb6c08a..1c1b4d388faa9 100755 --- a/ci/scripts/backwards-compat-test.sh +++ b/ci/scripts/backwards-compat-test.sh @@ -23,128 +23,18 @@ done shift $((OPTIND -1)) # profile is either ci-dev or ci-release -if [[ "$profile" != "ci-dev" ]] && [[ "$profile" != "ci-release" ]]; then +if [[ "$profile" == "ci-dev" ]]; then + echo "Running in ci-dev mode" +elif [[ "$profile" == "ci-release" ]]; then + echo "Running in ci-release mode" +else echo "Invalid option: profile must be either ci-dev or ci-release" 1>&2 exit 1 fi -################################### ENVIRONMENT VARIABLES +source backwards-compat-tests/scripts/utils.sh -LOG_DIR=.risingwave/log -mkdir -p "$LOG_DIR" - -QUERY_LOG_FILE="$LOG_DIR/query.log" - -# TODO(kwannoel): automatically derive this by: -# 1. Fetching major version. -# 2. Find the earliest minor version of that major version. -TAG=v0.18.0 -# Duration to wait for recovery (seconds) -RECOVERY_DURATION=20 - -################################### TEST UTILIIES - -assert_not_empty() { - set +e - if [[ $(wc -l < "$1" | sed 's/^ *//g') -gt 1 ]]; then - echo "assert_not_empty PASSED for $1" - else - echo "assert_not_empty FAILED for $1" - buildkite-agent artifact upload "$1" - exit 1 - fi - set -e -} - -assert_eq() { - set +e - if [[ -z $(diff "$1" "$2") ]]; then - echo "assert_eq PASSED for $1 and $2" - else - echo "FAILED" - buildkite-agent artifact upload "$1" - buildkite-agent artifact upload "$2" - exit 1 - fi - set -e -} - -################################### QUERIES - -run_sql () { - psql -h localhost -p 4566 -d dev -U root -c "$@" -} - -seed_table() { - START="$1" - END="$2" - for i in $(seq "$START" "$END") - do - run_sql "INSERT into t values ($i, $i);" 1>$QUERY_LOG_FILE 2>&1 - done - run_sql "flush;" -} - -random_delete() { - START=$1 - END=$2 - COUNT=$3 - for i in $(seq 1 "$COUNT") - do - run_sql "DELETE FROM t WHERE v1 = $(("$RANDOM" % END));" 1>$QUERY_LOG_FILE 2>&1 - done - run_sql "flush;" -} - -random_update() { - START=$1 - END=$2 - COUNT=$3 - for _i in $(seq 1 "$COUNT") - do - run_sql "UPDATE t SET v2 = v2 + 1 WHERE v1 = $(("$RANDOM" % END));" 1>$QUERY_LOG_FILE 2>&1 - done - run_sql "flush;" -} - -# Setup table and materialized view. -# Run updates and deletes on the table. -# Get the results. -# TODO: Run nexmark, tpch queries -run_sql_old_cluster() { - run_sql "CREATE TABLE t(v1 int primary key, v2 int);" - - seed_table 1 10000 - - run_sql "CREATE MATERIALIZED VIEW m as SELECT * from t;" & - CREATE_MV_PID=$! - - seed_table 10001 20000 - - random_update 1 20000 1000 - - random_delete 1 20000 1000 - - wait $CREATE_MV_PID - - run_sql "CREATE MATERIALIZED VIEW m2 as SELECT v1, sum(v2) FROM m GROUP BY v1;" - - run_sql "select * from m ORDER BY v1;" > BEFORE_1 - run_sql "select * from m2 ORDER BY v1;" > BEFORE_2 -} - -# Just check if the results are the same as old cluster. -run_sql_new_cluster() { - run_sql "SELECT * from m ORDER BY v1;" > AFTER_1 - run_sql "select * from m2 ORDER BY v1;" > AFTER_2 -} - -run_updates_and_deletes_new_cluster() { - random_update 1 20000 1000 - random_delete 1 20000 1000 -} - -################################### CLUSTER CONFIGURATION +################################### Main configure_rw() { echo "--- Setting up cluster config" @@ -157,6 +47,8 @@ full-without-monitoring: - use: compute-node - use: frontend - use: compactor + - use: zookeeper + - use: kafka EOF cat < risedev-components.user.env @@ -164,15 +56,7 @@ RISEDEV_CONFIGURED=true ENABLE_MINIO=true ENABLE_ETCD=true -# FIXME: Don't use kafka for now, -# Until 1.0, then we can re-enable it... -# This is because previous versions of risedev-tool (from previous releases) -# fetch kafka from clcdn.apache.org which only maintains the latest few -# versions of kafka. -# This comment belongs to a PR for the release of 1.0. -# In this PR, we also change the source of kafka bin to downloads.apache.org, -# which maintain old versions of kafka (until 2012). -# ENABLE_KAFKA=true +ENABLE_KAFKA=true # Fetch risingwave binary from release. ENABLE_BUILD_RUST=false @@ -180,85 +64,47 @@ ENABLE_BUILD_RUST=false # Ensure it will link the all-in-one binary from our release. ENABLE_ALL_IN_ONE=true -# ENABLE_RELEASE_PROFILE=true +# Even if CI is release profile, we won't ever +# build the binaries from scratch. +# So we just use target/debug for simplicity. +ENABLE_RELEASE_PROFILE=false EOF } -configure_latest_rw() { -cat < risedev-profiles.user.yml -full-without-monitoring: - steps: - - use: minio - - use: etcd - - use: meta-node - - use: compute-node - - use: frontend - - use: compactor -EOF +setup_old_cluster() { + echo "--- Build risedev for $OLD_VERSION, it may not be backwards compatible" + git config --global --add safe.directory /risingwave + git checkout "v${OLD_VERSION}-rc" + cargo build -p risedev + OLD_URL=https://github.com/risingwavelabs/risingwave/releases/download/v${OLD_VERSION}/risingwave-v${OLD_VERSION}-x86_64-unknown-linux.tar.gz + wget $OLD_URL + tar -xvf risingwave-v${OLD_VERSION}-x86_64-unknown-linux.tar.gz + mv risingwave target/debug/risingwave + + echo "--- Start cluster on tag $OLD_VERSION" + git config --global --add safe.directory /risingwave } -echo "--- Configuring RW" -configure_rw - -echo "--- Build risedev for $TAG, it may not be backwards compatible" -git config --global --add safe.directory /risingwave -git checkout "${TAG}-rc" -cargo build -p risedev - -echo "--- Setup old release $TAG" -wget "https://github.com/risingwavelabs/risingwave/releases/download/${TAG}/risingwave-${TAG}-x86_64-unknown-linux.tar.gz" -tar -xvf risingwave-${TAG}-x86_64-unknown-linux.tar.gz -mkdir -p target/debug -cp risingwave target/debug/risingwave - -echo "--- Teardown any old cluster" -set +e -./risedev down -set -e - -echo "--- Start cluster on tag $TAG" -git config --global --add safe.directory /risingwave -# NOTE(kwannoel): We use this config because kafka encounters errors upon cluster restart, -# If previous kafka topics and partitions were not removed. -./risedev d full-without-monitoring && rm .risingwave/log/* -pushd .risingwave/log/ -buildkite-agent artifact upload "./*.log" -popd - -# TODO(kwannoel): Run nexmark queries + tpch queries. -# TODO(kwannoel): Refactor this into a rust binary + test files for better maintainability. -echo "--- Running Queries Old Cluster @ $TAG" -run_sql_old_cluster - -echo "--- Kill cluster on tag $TAG" -./risedev k - -echo "--- Setup Risingwave @ $RW_COMMIT" -download_and_prepare_rw $profile common - -echo "--- Start cluster on latest" -configure_rw -./risedev d full-without-monitoring - -echo "--- Wait ${RECOVERY_DURATION}s for Recovery on Old Cluster Data" -sleep $RECOVERY_DURATION - -echo "--- Running Queries New Cluster" -run_sql_new_cluster - -echo "--- Sanity Checks" -echo "AFTER_1" -cat AFTER_1 | tail -n 100 -echo "AFTER_2" -cat AFTER_2 | tail -n 100 +setup_new_cluster() { + echo "--- Setup Risingwave @ $RW_COMMIT" + git checkout - + download_and_prepare_rw $profile common + # Make sure we always start w/o old config + rm -r .risingwave/config +} -echo "--- Comparing results" -assert_eq BEFORE_1 AFTER_1 -assert_eq BEFORE_2 AFTER_2 -assert_not_empty BEFORE_1 -assert_not_empty BEFORE_2 -assert_not_empty AFTER_1 -assert_not_empty AFTER_2 +main() { + set -euo pipefail + # Make sure we have all the branches + git fetch --all + get_rw_versions + setup_old_cluster + configure_rw + seed_old_cluster "$OLD_VERSION" + + setup_new_cluster + configure_rw + validate_new_cluster "$NEW_VERSION" +} -echo "--- Running Updates and Deletes on new cluster should not fail" -run_updates_and_deletes_new_cluster \ No newline at end of file +main \ No newline at end of file diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 9d1b564f191da..0e4ded02791ec 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -422,7 +422,7 @@ steps: - "build" plugins: - docker-compose#v4.9.0: - run: ci-flamegraph-env + run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/clippy.toml b/clippy.toml index 465ccb68ced30..bcc3c789ae35a 100644 --- a/clippy.toml +++ b/clippy.toml @@ -8,6 +8,11 @@ disallowed-methods = [ { path = "num_traits::sign::Signed::is_positive", reason = "This returns true for 0.0 but false for 0." }, { path = "num_traits::sign::Signed::is_negative", reason = "This returns true for -0.0 but false for 0." }, { path = "num_traits::sign::Signed::signum", reason = "This returns 1.0 for 0.0 but 0 for 0." }, + { path = "speedate::DateTime::parse_str", reason = "Please use `parse_str_rfc3339` instead." }, + { path = "speedate::DateTime::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." }, + { path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." }, + { path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." }, + { path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." }, ] disallowed-types = [ { path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." }, diff --git a/docs/developer-guide.md b/docs/developer-guide.md index 584f65d1d535e..e256d1a5ebd83 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -472,6 +472,18 @@ You may use that to reproduce it in your local environment. For example: MADSIM_TEST_SEED=4 ./risedev sit-test test_backfill_with_upstream_and_snapshot_read ``` +### Backwards Compatibility tests + +This tests backwards compatibility between the earliest minor version +and latest minor version of Risingwave (e.g. 1.0.0 vs 1.1.0). + +You can run it locally with: +```bash +./risedev backwards-compat-test +``` + +In CI, you can make sure the PR runs it by adding the label `ci/run-backwards-compat-tests`. + ## Miscellaneous checks For shell code, please run: diff --git a/e2e_test/batch/functions/array_concat.slt.part b/e2e_test/batch/functions/array_concat.slt.part index b2cee32de28f5..7c853262c587d 100644 --- a/e2e_test/batch/functions/array_concat.slt.part +++ b/e2e_test/batch/functions/array_concat.slt.part @@ -664,17 +664,17 @@ select array_prepend(1::real, array[1]::real[]); {1,1} query T -select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date; +select array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[] || '2020-01-01 12:34:56'::timestamp::date; ---- {"2020-01-02 23:34:56","2020-01-01 00:00:00"} query T -select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[], '2020-01-01 12:34:56'::timestamp::date); +select array_append(array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[], '2020-01-01 12:34:56'::timestamp::date); ---- {"2020-01-02 23:34:56","2020-01-01 00:00:00"} query T -select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone::varchar]::timestamp[]); +select array_prepend('2020-01-01 12:34:56'::timestamp::date, array['2020-01-02 12:34:56 -11:00'::timestamp with time zone]::timestamp[]); ---- {"2020-01-01 00:00:00","2020-01-02 23:34:56"} diff --git a/e2e_test/batch/functions/array_max.slt.part b/e2e_test/batch/functions/array_max.slt.part new file mode 100644 index 0000000000000..54a0e8cb45d19 --- /dev/null +++ b/e2e_test/batch/functions/array_max.slt.part @@ -0,0 +1,72 @@ +query I +select array_max(array[1, 2, 3]); +---- +3 + +query I +select array_max(array[2, 3, 5, 2, 4]); +---- +5 + +query I +select array_max(array[114514, 114513]); +---- +114514 + +query I +select array_max(array['a', 'b', 'c', 'a']); +---- +c + +query I +select array_max(array['e💩a', 'f🤔️b', 'c🥵c', 'd🥳d', 'e💩e']); +---- +f🤔️b + +query I +select array_max(array['2c😅🤔😅️c2', '114🥵514', '30🤣🥳03', '5🥵💩💩🥵5']); +---- +5🥵💩💩🥵5 + +query error invalid digit found in string +select array_max(array['a', 'b', 'c', 114514]); + +query error invalid digit found in string +select array_max(array[114514, 'a', 'b', 'c']); + +# i32::MIN & i32::MIN - 1 & i32::MAX +query I +select array_max(array[-2147483648, 2147483647, -2147483649]); +---- +2147483647 + +# i64::MIN & i64::MIN - 1 & i64::MAX +query I +select array_max(array[-9223372036854775808, 9223372036854775807, -9223372036854775809]); +---- +9223372036854775807 + +query I +select array_max(array['a', '', 'c']); +---- +c + +query I +select array_max(array[3.14, 1.14, 1.14514]); +---- +3.14 + +query I +select array_max(array[3.1415926, 191.14, 114514, 1313.1414]); +---- +114514 + +query I +select array_max(array[1e-4, 1.14514e5, 1.14514e-5]); +---- +114514 + +query I +select array_max(array[date'2002-10-30', date'2023-09-06', date'2017-06-18']); +---- +2023-09-06 \ No newline at end of file diff --git a/e2e_test/batch/functions/array_min.slt.part b/e2e_test/batch/functions/array_min.slt.part new file mode 100644 index 0000000000000..0a252465c58d7 --- /dev/null +++ b/e2e_test/batch/functions/array_min.slt.part @@ -0,0 +1,93 @@ +query I +select array_min(array[1, 2, 3]); +---- +1 + +query I +select array_min(array[2, 3, 5, 2, 4]); +---- +2 + +query I +select array_min(array[114514, 123456]); +---- +114514 + +query I +select array_min(array['a', 'b', 'c', 'a']); +---- +a + +query I +select array_min(array['e💩a', 'f🤔️e', 'c🥵c', 'g🥳g', 'e💩e']); +---- +c🥵c + +query I +select array_min(array['901😅🤔😅️109', '114🥵514', '3🤣🥳3', '5🥵💩💩🥵5']); +---- +114🥵514 + +query error invalid digit found in string +select array_min(array['a', 'b', 'c', 114514]); + +query error invalid digit found in string +select array_min(array[114514, 'a', 'b', 'c']); + +# i32::MIN & i32::MIN - 1 & i32::MAX +query I +select array_min(array[-2147483648, 2147483647, -2147483649]); +---- +-2147483649 + +# i64::MIN & i64::MIN - 1 & i64::MAX +query I +select array_min(array[-9223372036854775808, 9223372036854775807, -9223372036854775809]); +---- +-9223372036854775809 + +query I +select array_min(array['a', '', 'c']); +---- +(empty) + +query I +select array_min(array[3.14, 1.14, 1.14514]); +---- +1.14 + +query I +select array_min(array[3.1415926, 191.14, 114514, 1313.1414]); +---- +3.1415926 + +query I +select array_min(array[1e-4, 1.14514e5, 1.14514e-5]); +---- +0.0000114514 + +query I +select array_min(array[date'2002-10-30', date'2023-09-06', date'2017-06-18']); +---- +2002-10-30 + +query I +select array_min( + array[ + '2002-10-30 00:00:00'::timestamp, + '2023-09-06 13:10:00'::timestamp, + '2017-06-18 12:00:00'::timestamp + ] +); +---- +2002-10-30 00:00:00 + +query I +select array_min(array['\xDE'::bytea, '\xDF'::bytea, '\xDC'::bytea]); +---- +\xdc + +query I +select array_min(array[NULL, 'a', 'b']); +---- +a \ No newline at end of file diff --git a/e2e_test/batch/types/timestamptz_utc.slt.part b/e2e_test/batch/types/timestamptz_utc.slt.part index a75529b30b1cb..5a0fe6ebd13cc 100644 --- a/e2e_test/batch/types/timestamptz_utc.slt.part +++ b/e2e_test/batch/types/timestamptz_utc.slt.part @@ -42,6 +42,19 @@ select '2022-10-01T12:00:00Z'::timestamp with time zone; ---- 2022-10-01 12:00:00+00:00 +query T +select '2023-11-05 01:40-07:00'::timestamptz; +---- +2023-11-05 08:40:00+00:00 + +query T +select '2023-11-05 01:40-08:00'::timestamptz; +---- +2023-11-05 09:40:00+00:00 + +statement error +select '0'::timestamptz; + query T select '2022-10-01 12:00:00+01:00'::timestamp with time zone BETWEEN '2022-10-01T10:59:59Z' AND '2022-10-01T11:00:01Z'; ---- diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 876d41d9adcbd..f9feed9429d2c 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -46,3 +46,8 @@ query I select count(*) from person_rw; ---- 3 + +query I +select count(*) from tt3_rw; +---- +2 diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 39e99b39df4fb..4cfacf120648e 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -110,6 +110,38 @@ create table orders_2 ( server.id = '5088' ); +statement error +create table tt3_rw ( + v1 int, + v2 timestamp, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'my@db', + table.name = 'tt3', + server.id = '5089' +); + +statement ok +create table tt3_rw ( + v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'my@db', + table.name = 'tt3', + server.id = '5089' +); + # Some columns missing and reordered (postgres-cdc) statement ok create table shipments_2 ( diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 1e8ca93ddf864..89e5274ac3ee1 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -51,3 +51,7 @@ VALUES (1,1,'no'), CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; + +CREATE TABLE tt3 (v1 int primary key, v2 timestamp); +INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); +INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22'); diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index d177d258b7c0c..66d3a360fcb70 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -12,7 +12,7 @@ statement ok create materialized view mv as select * from t emit on window close; statement ok -insert into t values ('2023-05-06 16:51:00', 1); +insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3); statement ok insert into t values ('2023-05-06 16:56:01', 1); @@ -25,6 +25,8 @@ query TI select * from mv; ---- 2023-05-06 16:51:00 1 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 statement ok drop materialized view mv; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 3155e1848446a..54094bc21862d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -230,6 +230,8 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam return val == Data.DataType.TypeName.DECIMAL_VALUE; case "varchar": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "timestamp": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; default: return true; // true for other uncovered types } diff --git a/proto/expr.proto b/proto/expr.proto index 1b3aeff6480ff..9bd5af893cb8b 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -197,6 +197,8 @@ message ExprNode { ARRAY_REPLACE = 543; ARRAY_DIMS = 544; ARRAY_TRANSFORM = 545; + ARRAY_MIN = 546; + ARRAY_MAX = 547; // Int256 functions HEX_TO_INT256 = 560; diff --git a/proto/meta.proto b/proto/meta.proto index b9701de9f05c3..53cf83a28fb2d 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -130,6 +130,30 @@ message FlushResponse { hummock.HummockSnapshot snapshot = 2; } +// The reason why the data sources in the cluster are paused. +enum PausedReason { + PAUSED_REASON_UNSPECIFIED = 0; + // The cluster is paused due to configuration change, e.g. altering table schema and scaling. + PAUSED_REASON_CONFIG_CHANGE = 1; + // The cluster is paused due to manual operation, e.g. `risectl` command or the + // `pause_on_next_bootstrap` system variable. + PAUSED_REASON_MANUAL = 2; +} + +message PauseRequest {} + +message PauseResponse { + optional PausedReason prev = 1; + optional PausedReason curr = 2; +} + +message ResumeRequest {} + +message ResumeResponse { + optional PausedReason prev = 1; + optional PausedReason curr = 2; +} + message CancelCreatingJobsRequest { message CreatingJobInfo { uint32 database_id = 1; @@ -215,6 +239,8 @@ message ListActorStatesResponse { service StreamManagerService { rpc Flush(FlushRequest) returns (FlushResponse); + rpc Pause(PauseRequest) returns (PauseResponse); + rpc Resume(ResumeRequest) returns (ResumeResponse); rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse); rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse); rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse); @@ -389,14 +415,6 @@ service NotificationService { rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse); } -message PauseRequest {} - -message PauseResponse {} - -message ResumeRequest {} - -message ResumeResponse {} - message GetClusterInfoRequest {} message GetClusterInfoResponse { @@ -455,9 +473,6 @@ message GetReschedulePlanResponse { } service ScaleService { - // TODO(Kexiang): delete them when config change interface is finished - rpc Pause(PauseRequest) returns (PauseResponse); - rpc Resume(ResumeRequest) returns (ResumeResponse); rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc Reschedule(RescheduleRequest) returns (RescheduleResponse); rpc GetReschedulePlan(GetReschedulePlanRequest) returns (GetReschedulePlanResponse); diff --git a/src/common/src/cast/mod.rs b/src/common/src/cast/mod.rs index fdf3c7e598da2..82c69984ec0ea 100644 --- a/src/common/src/cast/mod.rs +++ b/src/common/src/cast/mod.rs @@ -43,7 +43,7 @@ pub fn str_to_timestamp(elem: &str) -> Result { #[inline] pub fn parse_naive_date(s: &str) -> Result { - let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?; + let res = SpeedDate::parse_str_rfc3339(s).map_err(|_| PARSE_ERROR_STR_TO_DATE.to_string())?; Ok(Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32).0) } @@ -63,7 +63,10 @@ pub fn parse_naive_time(s: &str) -> Result { #[inline] pub fn parse_naive_datetime(s: &str) -> Result { - if let Ok(res) = SpeedDateTime::parse_str(s) { + if let Ok(res) = SpeedDateTime::parse_str_rfc3339(s) { + if res.time.tz_offset.is_some() { + return Err(PARSE_ERROR_STR_TO_TIMESTAMP.into()); + } Ok(Date::from_ymd_uncheck( res.date.year as i32, res.date.month as u32, @@ -77,7 +80,8 @@ pub fn parse_naive_datetime(s: &str) -> Result { ) .0) } else { - let res = SpeedDate::parse_str(s).map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?; + let res = SpeedDate::parse_str_rfc3339(s) + .map_err(|_| PARSE_ERROR_STR_TO_TIMESTAMP.to_string())?; Ok( Date::from_ymd_uncheck(res.year as i32, res.month as u32, res.day as u32) .and_hms_micro_uncheck(0, 0, 0, 0) @@ -238,7 +242,7 @@ mod tests { str_to_timestamp("1999-01-08 04:02").unwrap(); str_to_timestamp("1999-01-08 04:05:06").unwrap(); assert_eq!( - str_to_timestamp("2022-08-03T10:34:02Z").unwrap(), + str_to_timestamp("2022-08-03T10:34:02").unwrap(), str_to_timestamp("2022-08-03 10:34:02").unwrap() ); str_to_date("1999-01-08").unwrap(); diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 7737d76cd48fc..d0e155f784546 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -375,6 +375,18 @@ impl DataType { } } + /// Returns the inner type of a list type. + /// + /// # Panics + /// + /// Panics if the type is not a list type. + pub fn as_list(&self) -> &DataType { + match self { + DataType::List(t) => t, + _ => panic!("expect list type"), + } + } + /// WARNING: Currently this should only be used in `WatermarkFilterExecutor`. Please be careful /// if you want to use this. pub fn min_value(&self) -> ScalarImpl { diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 0d9af9a5e3d3f..1f9b962c9d376 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -16,7 +16,7 @@ use std::io::Write; use std::str::FromStr; use bytes::{Bytes, BytesMut}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use chrono_tz::Tz; use postgres_types::ToSql; use serde::{Deserialize, Serialize}; @@ -148,8 +148,32 @@ impl FromStr for Timestamptz { "Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)" , "\nFor example: '2021-04-01 00:00:00+00:00'" ); - let ret = s.parse::>().map_err(|_| ERROR_MSG)?; - Ok(Timestamptz(ret.timestamp_micros())) + // Try `speedate` first + // * It is also used by `str_to_{date,time,timestamp}` + // * It can parse without seconds `2006-01-02 15:04-07:00` + let ret = match speedate::DateTime::parse_str_rfc3339(s) { + Ok(r) => r, + Err(_) => { + // Supplement with `chrono` for existing cases: + // * Extra space before offset `2006-01-02 15:04:05 -07:00` + return s + .parse::>() + .map(|t| Timestamptz(t.timestamp_micros())) + .map_err(|_| ERROR_MSG); + } + }; + if ret.time.tz_offset.is_none() { + return Err(ERROR_MSG); + } + if ret.date.year < 1600 { + return Err("parsing timestamptz with year < 1600 unsupported"); + } + Ok(Timestamptz( + ret.timestamp_tz() + .checked_mul(1000000) + .and_then(|us| us.checked_add(ret.time.microsecond.into())) + .ok_or(ERROR_MSG)?, + )) } } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 15e9612450ad5..306d79c3f5231 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::time::Duration; use anyhow::Ok; +use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use clickhouse::Client; @@ -386,6 +387,39 @@ impl NatsCommon { Ok(subscription) } + pub(crate) async fn build_consumer( + &self, + split_id: i32, + start_sequence: Option, + ) -> anyhow::Result< + async_nats::jetstream::consumer::Consumer, + > { + let context = self.build_context().await?; + let stream = self.build_or_get_stream(context.clone()).await?; + let name = format!("risingwave-consumer-{}-{}", self.subject, split_id); + let mut config = jetstream::consumer::pull::Config { + ack_policy: jetstream::consumer::AckPolicy::None, + ..Default::default() + }; + match start_sequence { + Some(v) => { + let consumer = stream + .get_or_create_consumer(&name, { + config.deliver_policy = DeliverPolicy::ByStartSequence { + start_sequence: v + 1, + }; + config + }) + .await?; + Ok(consumer) + } + None => { + let consumer = stream.get_or_create_consumer(&name, config).await?; + Ok(consumer) + } + } + } + pub(crate) async fn build_or_get_stream( &self, jetstream: jetstream::Context, diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 42c3e82c65e35..0bfd69a7bb6fe 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -298,7 +298,7 @@ mod tests { SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)), SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)), SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)), - SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamp, ColumnId::from(11)), + SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)), SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)), ] } @@ -333,9 +333,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "1970-01-01T00:00:00".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:01".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "1970-01-01T00:00:01Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); } @@ -368,9 +368,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "1970-01-01T00:00:00".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:01".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "1970-01-01T00:00:01Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); } @@ -404,9 +404,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "5138-11-16T09:46:39".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "2038-01-09T03:14:07".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "2038-01-09T03:14:07Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}"); } @@ -441,9 +441,9 @@ mod tests { assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "5138-11-16T09:46:39".parse().unwrap() ))))); - assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "2038-01-09T03:14:07".parse().unwrap() - ))))); + assert!(row[11].eq(&Some(ScalarImpl::Timestamptz( + "2038-01-09T03:14:07Z".parse().unwrap() + )))); assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}"); } diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 36c215fca6abf..88384bfb685e6 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -44,7 +44,8 @@ impl SplitEnumerator for NatsSplitEnumerator { // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(), - split_num: 1, + split_num: 0, // be the same as `from_nats_jetstream_message` + start_sequence: None, }; Ok(vec![nats_split]) diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index db0b5349d693b..afb3029d3b917 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -12,21 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use async_nats; use crate::source::base::SourceMessage; use crate::source::SourceMeta; impl SourceMessage { - pub fn from_nats_message(message: async_nats::Message) -> Self { + pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self { SourceMessage { key: None, - payload: Some(message.payload.to_vec()), - // Nats message doesn't have offset - offset: "".to_string(), - split_id: Arc::from(""), + payload: Some(message.message.payload.to_vec()), + // For nats jetstream, use sequence id as offset + offset: message.info().unwrap().stream_sequence.to_string(), + split_id: "0".into(), meta: SourceMeta::Empty, } } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 49e48fc949b6d..c0070a16c1392 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -13,19 +13,21 @@ // limitations under the License. use anyhow::Result; +use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; +use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, }; pub struct NatsSplitReader { - subscriber: async_nats::Subscriber, + consumer: consumer::Consumer, properties: NatsProperties, parser_config: ParserConfig, source_ctx: SourceContextRef, @@ -44,9 +46,16 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); - let subscriber = properties.common.build_subscriber().await?; + let splits = splits + .into_iter() + .map(|split| split.into_nats().unwrap()) + .collect::>(); + let consumer = properties + .common + .build_consumer(0, splits[0].start_sequence) + .await?; Ok(Self { - subscriber, + consumer, properties, parser_config, source_ctx, @@ -64,11 +73,12 @@ impl CommonSplitReader for NatsSplitReader { #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { let capacity = self.source_ctx.source_ctrl_opts.chunk_size; + let messages = self.consumer.messages().await?; #[for_await] - for msgs in self.subscriber.ready_chunks(capacity) { + for msgs in messages.ready_chunks(capacity) { let mut msg_vec = Vec::with_capacity(capacity); for msg in msgs { - msg_vec.push(SourceMessage::from_nats_message(msg)); + msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?)); } yield msg_vec; } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 565f1bc914032..f0fcfaff35481 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -25,7 +25,7 @@ pub struct NatsSplit { // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. pub(crate) split_num: i32, - // nats does not provide offset + pub(crate) start_sequence: Option, } impl SplitMetaData for NatsSplit { @@ -44,11 +44,16 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32) -> Self { - Self { subject, split_num } + pub fn new(subject: String, split_num: i32, start_sequence: Option) -> Self { + Self { + subject, + split_num, + start_sequence, + } } - pub fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> { + pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { + self.start_sequence = Some(start_sequence.as_str().parse::().unwrap()); Ok(()) } } diff --git a/src/ctl/src/cmd_impl/meta/pause_resume.rs b/src/ctl/src/cmd_impl/meta/pause_resume.rs index a75b653fa5bc4..5026d11b5b0e6 100644 --- a/src/ctl/src/cmd_impl/meta/pause_resume.rs +++ b/src/ctl/src/cmd_impl/meta/pause_resume.rs @@ -12,14 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::meta::PausedReason; + use crate::CtlContext; +fn desc(reason: PausedReason) -> &'static str { + // Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat + // `Unspecified` as not paused here. + match reason { + PausedReason::Unspecified => "not paused", + PausedReason::ConfigChange => "paused due to configuration change", + PausedReason::Manual => "paused manually", + } +} + pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - meta_client.pause().await?; + let response = meta_client.pause().await?; - println!("Paused"); + println!( + "Done.\nPrevious: {}\nCurrent: {}", + desc(response.prev()), + desc(response.curr()) + ); Ok(()) } @@ -27,9 +43,13 @@ pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { pub async fn resume(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - meta_client.resume().await?; + let response = meta_client.resume().await?; - println!("Resumed"); + println!( + "Done.\nPrevious: {}\nCurrent: {}", + desc(response.prev()), + desc(response.curr()) + ); Ok(()) } diff --git a/src/expr/src/sig/func.rs b/src/expr/src/sig/func.rs index d84a065ad095a..5dca4da2f4486 100644 --- a/src/expr/src/sig/func.rs +++ b/src/expr/src/sig/func.rs @@ -199,6 +199,12 @@ mod tests { ArrayAccess: [ "array_access(list, int32) -> boolean/int16/int32/int64/int256/float32/float64/decimal/serial/date/time/timestamp/timestamptz/interval/varchar/bytea/jsonb/list/struct", ], + ArrayMin: [ + "array_min(list) -> bytea/varchar/timestamptz/timestamp/time/date/int256/serial/decimal/float32/float64/int16/int32/int64", + ], + ArrayMax: [ + "array_max(list) -> bytea/varchar/timestamptz/timestamp/time/date/int256/serial/decimal/float32/float64/int16/int32/int64", + ], } "#]]; expected.assert_debug_eq(&duplicated); diff --git a/src/expr/src/vector_op/array_access.rs b/src/expr/src/vector_op/array_access.rs index 57fe3f29feb3f..40c4568c7d467 100644 --- a/src/expr/src/vector_op/array_access.rs +++ b/src/expr/src/vector_op/array_access.rs @@ -24,7 +24,8 @@ pub fn array_access(list: ListRef<'_>, index: i32) -> Result any")] supports +/// In this way we could avoid manual macro expansion +#[function("array_min(list) -> *int")] +#[function("array_min(list) -> *float")] +#[function("array_min(list) -> decimal")] +#[function("array_min(list) -> serial")] +#[function("array_min(list) -> int256")] +#[function("array_min(list) -> date")] +#[function("array_min(list) -> time")] +#[function("array_min(list) -> timestamp")] +#[function("array_min(list) -> timestamptz")] +#[function("array_min(list) -> varchar")] +#[function("array_min(list) -> bytea")] +pub fn array_min(list: ListRef<'_>) -> Result> { + let min_value = list.iter().flatten().map(DefaultOrdered).min(); + match min_value.map(|v| v.0).to_owned_datum() { + Some(s) => Ok(Some(s.try_into()?)), + None => Ok(None), + } +} + +#[function("array_max(list) -> *int")] +#[function("array_max(list) -> *float")] +#[function("array_max(list) -> decimal")] +#[function("array_max(list) -> serial")] +#[function("array_max(list) -> int256")] +#[function("array_max(list) -> date")] +#[function("array_max(list) -> time")] +#[function("array_max(list) -> timestamp")] +#[function("array_max(list) -> timestamptz")] +#[function("array_max(list) -> varchar")] +#[function("array_max(list) -> bytea")] +pub fn array_max(list: ListRef<'_>) -> Result> { + let max_value = list.iter().flatten().map(DefaultOrdered).max(); + match max_value.map(|v| v.0).to_owned_datum() { + Some(s) => Ok(Some(s.try_into()?)), + None => Ok(None), + } +} diff --git a/src/expr/src/vector_op/mod.rs b/src/expr/src/vector_op/mod.rs index 4bc147cf3caec..f4aaa375f8f76 100644 --- a/src/expr/src/vector_op/mod.rs +++ b/src/expr/src/vector_op/mod.rs @@ -16,6 +16,7 @@ pub mod arithmetic_op; pub mod array_access; pub mod array_distinct; pub mod array_length; +pub mod array_min_max; pub mod array_positions; pub mod array_range_access; pub mod array_remove; diff --git a/src/expr/src/vector_op/timestamptz.rs b/src/expr/src/vector_op/timestamptz.rs index ca24200300244..716a521f742e4 100644 --- a/src/expr/src/vector_op/timestamptz.rs +++ b/src/expr/src/vector_op/timestamptz.rs @@ -276,17 +276,13 @@ mod tests { #[test] fn test_timestamptz_to_and_from_string() { - let str1 = "0001-11-15 15:35:40.999999+08:00"; + let str1 = "1600-11-15 15:35:40.999999+08:00"; let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap(); - assert_eq!(timestamptz1.timestamp_micros(), -62108094259000001); + assert_eq!(timestamptz1.timestamp_micros(), -11648507059000001); let mut writer = String::new(); timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap(); - assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00"); - - let mut writer = String::new(); - timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap(); - assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00"); + assert_eq!(writer, "1600-11-15 07:35:40.999999+00:00"); let str2 = "1969-12-31 23:59:59.999999+00:00"; let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap(); diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index c505aaa18d2b2..4cb703906bf8c 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -788,10 +788,12 @@ impl Binder { ("array_prepend", raw_call(ExprType::ArrayPrepend)), ("array_to_string", raw_call(ExprType::ArrayToString)), ("array_distinct", raw_call(ExprType::ArrayDistinct)), + ("array_min", raw_call(ExprType::ArrayMin)), ("array_length", raw_call(ExprType::ArrayLength)), ("cardinality", raw_call(ExprType::Cardinality)), ("array_remove", raw_call(ExprType::ArrayRemove)), ("array_replace", raw_call(ExprType::ArrayReplace)), + ("array_max", raw_call(ExprType::ArrayMax)), ("array_position", raw_call(ExprType::ArrayPosition)), ("array_positions", raw_call(ExprType::ArrayPositions)), ("trim_array", raw_call(ExprType::TrimArray)), diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 4316223ec07ae..bb2e1d12d8f87 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -153,10 +153,12 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::Row | expr_node::Type::ArrayToString | expr_node::Type::ArrayCat + | expr_node::Type::ArrayMax | expr_node::Type::ArrayAppend | expr_node::Type::ArrayPrepend | expr_node::Type::FormatType | expr_node::Type::ArrayDistinct + | expr_node::Type::ArrayMin | expr_node::Type::ArrayDims | expr_node::Type::ArrayLength | expr_node::Type::Cardinality diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 8cccde3b251de..1febc46788af5 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -595,6 +595,12 @@ fn infer_type_for_special( Ok(Some(inputs[0].return_type())) } + ExprType::ArrayMin => { + ensure_arity!("array_min", | inputs | == 1); + inputs[0].ensure_array_type()?; + + Ok(Some(inputs[0].return_type().as_list().clone())) + } ExprType::ArrayDims => { ensure_arity!("array_dims", | inputs | == 1); inputs[0].ensure_array_type()?; @@ -607,6 +613,12 @@ fn infer_type_for_special( } Ok(Some(DataType::Varchar)) } + ExprType::ArrayMax => { + ensure_arity!("array_max", | inputs | == 1); + inputs[0].ensure_array_type()?; + + Ok(Some(inputs[0].return_type().as_list().clone())) + } ExprType::StringToArray => { ensure_arity!("string_to_array", 2 <= | inputs | <= 3); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 869e8935fa7ec..3f75c8d5f2257 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; use risingwave_pb::stream_plan::update_mutation::*; @@ -36,7 +37,7 @@ use super::info::BarrierActorInfo; use super::trace::TracedEpoch; use crate::barrier::CommandChanges; use crate::manager::{FragmentManagerRef, WorkerId}; -use crate::model::{ActorId, DispatcherId, FragmentId, PausedReason, TableFragments}; +use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment}; use crate::MetaResult; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 6c4ad5fc61813..f341cdd02497d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; +use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::Barrier; use risingwave_pb::stream_service::{ @@ -48,6 +49,7 @@ use self::command::CommandContext; use self::info::BarrierActorInfo; use self::notifier::Notifier; use self::progress::TrackingCommand; +use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::HummockManagerRef; @@ -56,7 +58,7 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv, WorkerId, }; -use crate::model::{ActorId, BarrierManagerState, PausedReason}; +use crate::model::{ActorId, BarrierManagerState}; use crate::rpc::metrics::MetaMetrics; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -671,7 +673,7 @@ impl GlobalBarrierManager { let Scheduled { command, - notifiers, + mut notifiers, send_latency_timer, checkpoint, span, @@ -695,25 +697,37 @@ impl GlobalBarrierManager { self.fragment_manager.clone(), self.env.stream_client_pool_ref(), info, - prev_epoch, - curr_epoch, + prev_epoch.clone(), + curr_epoch.clone(), state.paused_reason(), command, kind, self.source_manager.clone(), span.clone(), )); - let mut notifiers = notifiers; - notifiers.iter_mut().for_each(Notifier::notify_to_send); + send_latency_timer.observe_duration(); - checkpoint_control.enqueue_command(command_ctx.clone(), notifiers); self.inject_barrier(command_ctx.clone(), barrier_complete_tx) .instrument(span) .await; + // Notify about the injection. + let prev_paused_reason = state.paused_reason(); + let curr_paused_reason = command_ctx.next_paused_reason(); + + let info = BarrierInfo { + prev_epoch: prev_epoch.value(), + curr_epoch: curr_epoch.value(), + prev_paused_reason, + curr_paused_reason, + }; + notifiers.iter_mut().for_each(|n| n.notify_injected(info)); + // Update the paused state after the barrier is injected. - state.set_paused_reason(command_ctx.next_paused_reason()); + state.set_paused_reason(curr_paused_reason); + // Record the in-flight barrier. + checkpoint_control.enqueue_command(command_ctx.clone(), notifiers); } /// Inject a barrier to all CNs and spawn a task to collect it diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index 640886079854c..88acd9cd3dd7a 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -12,15 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::epoch::Epoch; +use risingwave_pb::meta::PausedReason; use tokio::sync::oneshot; use crate::{MetaError, MetaResult}; +/// The barrier info sent back to the caller when a barrier is injected. +#[derive(Debug, Clone, Copy)] +pub struct BarrierInfo { + pub prev_epoch: Epoch, + pub curr_epoch: Epoch, + + pub prev_paused_reason: Option, + pub curr_paused_reason: Option, +} + /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] pub(super) struct Notifier { - /// Get notified when scheduled barrier is about to send. - pub to_send: Option>, + /// Get notified when scheduled barrier is injected to compute nodes. + pub injected: Option>, /// Get notified when scheduled barrier is collected or failed. pub collected: Option>>, @@ -30,10 +42,10 @@ pub(super) struct Notifier { } impl Notifier { - /// Notify when we are about to send a barrier. - pub fn notify_to_send(&mut self) { - if let Some(tx) = self.to_send.take() { - tx.send(()).ok(); + /// Notify when we have injected a barrier to compute nodes. + pub fn notify_injected(&mut self, info: BarrierInfo) { + if let Some(tx) = self.injected.take() { + tx.send(info).ok(); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index aeb62851179d2..bce901cd6f459 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -19,6 +19,7 @@ use std::time::{Duration, Instant}; use futures::future::try_join_all; use itertools::Itertools; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; use risingwave_pb::stream_plan::AddMutation; use risingwave_pb::stream_service::{ @@ -33,7 +34,7 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::BarrierActorInfo; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; use crate::manager::WorkerId; -use crate::model::{BarrierManagerState, MigrationPlan, PausedReason}; +use crate::model::{BarrierManagerState, MigrationPlan}; use crate::stream::build_actor_connector_splits; use crate::MetaResult; diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 0d51ff11972e6..7c9fefd15606b 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -21,12 +21,12 @@ use std::time::Instant; use anyhow::anyhow; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; +use risingwave_pb::meta::PausedReason; use tokio::sync::{oneshot, watch, RwLock}; -use super::notifier::Notifier; +use super::notifier::{BarrierInfo, Notifier}; use super::{Command, Scheduled}; use crate::hummock::HummockManagerRef; -use crate::model::PausedReason; use crate::rpc::metrics::MetaMetrics; use crate::{MetaError, MetaResult}; @@ -237,42 +237,41 @@ impl BarrierScheduler { /// Run multiple commands and return when they're all completely finished. It's ensured that /// multiple commands are executed continuously. /// + /// Returns the barrier info of each command. + /// /// TODO: atomicity of multiple commands is not guaranteed. - pub async fn run_multiple_commands(&self, commands: Vec) -> MetaResult<()> { - struct Context { - collect_rx: oneshot::Receiver>, - finish_rx: oneshot::Receiver<()>, - } - + async fn run_multiple_commands(&self, commands: Vec) -> MetaResult> { let mut contexts = Vec::with_capacity(commands.len()); let mut scheduleds = Vec::with_capacity(commands.len()); for command in commands { + let (injected_tx, injected_rx) = oneshot::channel(); let (collect_tx, collect_rx) = oneshot::channel(); let (finish_tx, finish_rx) = oneshot::channel(); - contexts.push(Context { - collect_rx, - finish_rx, - }); + contexts.push((injected_rx, collect_rx, finish_rx)); scheduleds.push(self.inner.new_scheduled( command.need_checkpoint(), command, once(Notifier { + injected: Some(injected_tx), collected: Some(collect_tx), finished: Some(finish_tx), - ..Default::default() }), )); } self.push(scheduleds).await?; - for Context { - collect_rx, - finish_rx, - } in contexts - { + let mut infos = Vec::with_capacity(contexts.len()); + + for (injected_rx, collect_rx, finish_rx) in contexts { + // Wait for this command to be injected, and record the result. + let info = injected_rx + .await + .map_err(|e| anyhow!("failed to inject barrier: {}", e))?; + infos.push(info); + // Throw the error if it occurs when collecting this barrier. collect_rx .await @@ -284,23 +283,33 @@ impl BarrierScheduler { .map_err(|e| anyhow!("failed to finish command: {}", e))?; } - Ok(()) + Ok(infos) } /// Run a command with a `Pause` command before and `Resume` command after it. Used for /// configuration change. - pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> { + /// + /// Returns the barrier info of the actual command. + pub async fn run_config_change_command_with_pause( + &self, + command: Command, + ) -> MetaResult { self.run_multiple_commands(vec![ Command::pause(PausedReason::ConfigChange), command, Command::resume(PausedReason::ConfigChange), ]) .await + .map(|i| i[1]) } /// Run a command and return when it's completely finished. - pub async fn run_command(&self, command: Command) -> MetaResult<()> { - self.run_multiple_commands(vec![command]).await + /// + /// Returns the barrier info of the actual command. + pub async fn run_command(&self, command: Command) -> MetaResult { + self.run_multiple_commands(vec![command]) + .await + .map(|i| i[0]) } /// Flush means waiting for the next barrier to collect. diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index f6565b89dba3d..b6a3fc1a57b5c 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -26,7 +26,7 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType}; use super::picker::{ - SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, + CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker, }; use super::{ @@ -95,14 +95,19 @@ impl DynamicLevelSelectorCore { select_level: usize, target_level: usize, overlap_strategy: Arc, + compaction_task_validator: Arc, ) -> Box { if select_level == 0 { if target_level == 0 { - Box::new(TierCompactionPicker::new(self.config.clone())) + Box::new(TierCompactionPicker::new_with_validator( + self.config.clone(), + compaction_task_validator, + )) } else { - Box::new(LevelCompactionPicker::new( + Box::new(LevelCompactionPicker::new_with_validator( target_level, self.config.clone(), + compaction_task_validator, )) } } else { @@ -374,6 +379,11 @@ impl LevelSelector for DynamicLevelSelector { let overlap_strategy = create_overlap_strategy(compaction_group.compaction_config.compaction_mode()); let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers); + + // TODO: Determine which rule to enable by write limit + let compaction_task_validator = Arc::new(CompactionTaskValidator::new( + compaction_group.compaction_config.clone(), + )); for (score, select_level, target_level) in ctx.score_levels { if score <= SCORE_BASE { return None; @@ -382,6 +392,7 @@ impl LevelSelector for DynamicLevelSelector { select_level, target_level, overlap_strategy.clone(), + compaction_task_validator.clone(), ); let mut stats = LocalPickerStatistic::default(); if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) { diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 97443f3bc7a96..a9f8d95457ccf 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -247,25 +247,25 @@ impl LocalSelectorStatistic { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "write-amp"]) - .inc_by(stats.skip_by_write_amp_limit); + .inc(); } if stats.skip_by_count_limit > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "count"]) - .inc_by(stats.skip_by_count_limit); + .inc(); } if stats.skip_by_pending_files > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "pending-files"]) - .inc_by(stats.skip_by_pending_files); + .inc(); } if stats.skip_by_overlapping > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "overlapping"]) - .inc_by(stats.skip_by_overlapping); + .inc(); } metrics .compact_skip_frequency diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a15fab694ee86..c8025a9e99ac9 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -20,7 +20,10 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel}; use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic, + ValidationRuleType, +}; use crate::hummock::compaction::create_overlap_strategy; use crate::hummock::compaction::picker::TrivialMovePicker; use crate::hummock::level_handler::LevelHandler; @@ -28,6 +31,7 @@ use crate::hummock::level_handler::LevelHandler; pub struct LevelCompactionPicker { target_level: usize, config: Arc, + compaction_task_validator: Arc, } impl CompactionPicker for LevelCompactionPicker { @@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker { } impl LevelCompactionPicker { + #[cfg(test)] pub fn new(target_level: usize, config: Arc) -> LevelCompactionPicker { LevelCompactionPicker { target_level, + compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), config, } } + pub fn new_with_validator( + target_level: usize, + config: Arc, + compaction_task_validator: Arc, + ) -> LevelCompactionPicker { + LevelCompactionPicker { + target_level, + config, + compaction_task_validator, + } + } + fn pick_base_trivial_move( &self, l0: &OverlappingLevel, @@ -117,10 +135,10 @@ impl LevelCompactionPicker { level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { + // TODO: remove this let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size(); let base_level_size = target_level.total_file_size - level_handlers[target_level.level_idx as usize].get_pending_file_size(); - if l0_size < base_level_size { stats.skip_by_write_amp_limit += 1; return None; @@ -157,7 +175,7 @@ impl LevelCompactionPicker { let mut skip_by_pending = false; let mut input_levels = vec![]; - let mut min_write_amp_meet = false; + for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos @@ -184,16 +202,6 @@ impl LevelCompactionPicker { continue; } - // The size of target level may be too large, we shall skip this compact task and wait - // the data in base level compact to lower level. - if target_level_size > self.config.max_compaction_bytes && strict_check { - continue; - } - - if input.total_file_size >= target_level_size { - min_write_amp_meet = true; - } - input_levels.push((input, target_level_size, target_level_ssts)); } @@ -204,20 +212,7 @@ impl LevelCompactionPicker { return None; } - if !min_write_amp_meet && strict_check { - // If the write-amplification of all candidate task are large, we may hope to wait base - // level compact more data to lower level. But if we skip all task, I'm - // afraid the data will be blocked in level0 and will be never compacted to base level. - // So we only allow one task exceed write-amplification-limit running in - // level0 to base-level. - return None; - } - for (input, target_file_size, target_level_files) in input_levels { - if min_write_amp_meet && input.total_file_size < target_file_size { - continue; - } - let mut select_level_inputs = input .sstable_infos .into_iter() @@ -228,18 +223,33 @@ impl LevelCompactionPicker { }) .collect_vec(); select_level_inputs.reverse(); + let target_file_count = target_level_files.len(); select_level_inputs.push(InputLevel { level_idx: target_level.level_idx, level_type: target_level.level_type, table_infos: target_level_files, }); - return Some(CompactionInput { + + let result = CompactionInput { input_levels: select_level_inputs, target_level: self.target_level, - target_sub_level_id: 0, - }); + select_input_size: input.total_file_size, + target_input_size: target_file_size, + total_file_count: (input.total_file_count + target_file_count) as u64, + ..Default::default() + }; + + if !self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::ToBase, + stats, + ) && strict_check + { + continue; + } + + return Some(result); } - stats.skip_by_write_amp_limit += 1; None } @@ -267,8 +277,6 @@ impl LevelCompactionPicker { self.config.sub_level_max_compaction_bytes, ); - let tier_sub_level_compact_level_count = - self.config.level0_sub_level_compact_level_count as usize; let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( self.config.sub_level_max_compaction_bytes / 2, max_compaction_bytes, @@ -284,18 +292,10 @@ impl LevelCompactionPicker { continue; } - let mut skip_by_write_amp = false; - // Limit the number of selection levels for the non-overlapping - // sub_level at least level0_sub_level_compact_level_count - for (plan_index, input) in l0_select_tables_vec.into_iter().enumerate() { - if plan_index == 0 - && input.sstable_infos.len() - < self.config.level0_sub_level_compact_level_count as usize - { - // first plan level count smaller than limit - break; - } - + let validator = CompactionTaskValidator::new(self.config.clone()); + let mut select_input_size = 0; + let mut total_file_count = 0; + for input in l0_select_tables_vec { let mut max_level_size = 0; for level_select_table in &input.sstable_infos { let level_select_size = level_select_table @@ -306,22 +306,6 @@ impl LevelCompactionPicker { max_level_size = std::cmp::max(max_level_size, level_select_size); } - // This limitation would keep our write-amplification no more than - // ln(max_compaction_bytes/flush_level_bytes) / - // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half - // of level0_sub_level_compact_level_count just for convenient. - let is_write_amp_large = - max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 - >= input.total_file_size; - - if (is_write_amp_large - || input.sstable_infos.len() < tier_sub_level_compact_level_count) - && input.total_file_count < self.config.level0_max_compact_file_number as usize - { - skip_by_write_amp = true; - continue; - } - let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len()); for level_select_sst in input.sstable_infos { if level_select_sst.is_empty() { @@ -332,17 +316,25 @@ impl LevelCompactionPicker { level_type: LevelType::Nonoverlapping as i32, table_infos: level_select_sst, }); + + select_input_size += input.total_file_size; + total_file_count += input.total_file_count; } select_level_inputs.reverse(); - return Some(CompactionInput { + + let result = CompactionInput { input_levels: select_level_inputs, - target_level: 0, target_sub_level_id: level.sub_level_id, - }); - } + select_input_size, + total_file_count: total_file_count as u64, + ..Default::default() + }; - if skip_by_write_amp { - stats.skip_by_write_amp_limit += 1; + if !validator.valid_compact_task(&result, ValidationRuleType::Intra, stats) { + continue; + } + + return Some(result); } } @@ -402,6 +394,7 @@ impl LevelCompactionPicker { target_level_idx -= 1; } + let select_input_size = select_sst.file_size; let input_levels = vec![ InputLevel { level_idx: 0, @@ -418,6 +411,9 @@ impl LevelCompactionPicker { input_levels, target_level: 0, target_sub_level_id: l0.sub_levels[target_level_idx].sub_level_id, + select_input_size, + total_file_count: 1, + ..Default::default() }); } None @@ -901,6 +897,7 @@ pub mod tests { }], target_level: 1, target_sub_level_id: pending_level.sub_level_id, + ..Default::default() }; assert!(!levels_handler[0].is_level_pending_compact(&pending_level)); tier_task_input.add_pending_task(1, &mut levels_handler); @@ -918,7 +915,6 @@ pub mod tests { // But stopped by pending sub-level when trying to include more sub-levels. let mut picker = LevelCompactionPicker::new(1, config.clone()); let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); // Free the pending sub-level. @@ -964,7 +960,6 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - // println!("ret.input_levels: {:?}", ret.input_levels); // 1. trivial_move assert_eq!(2, ret.input_levels.len()); assert!(ret.input_levels[1].table_infos.is_empty()); @@ -974,7 +969,6 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - println!("ret.input_levels: {:?}", ret.input_levels); assert_eq!(3, ret.input_levels.len()); assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); } diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs new file mode 100644 index 0000000000000..4de77467205f7 --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -0,0 +1,208 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use risingwave_pb::hummock::CompactionConfig; + +use super::{CompactionInput, LocalPickerStatistic, MAX_COMPACT_LEVEL_COUNT}; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ValidationRuleType { + Tier = 0, + Intra = 1, + ToBase = 2, +} + +pub struct CompactionTaskValidator { + validation_rules: HashMap>, +} + +impl CompactionTaskValidator { + pub fn new(config: Arc) -> Self { + let mut validation_rules: HashMap< + ValidationRuleType, + Box, + > = HashMap::default(); + + validation_rules.insert( + ValidationRuleType::Tier, + Box::new(TierCompactionTaskValidationRule { + config: config.clone(), + enable: true, + }), + ); + + validation_rules.insert( + ValidationRuleType::Intra, + Box::new(IntraCompactionTaskValidationRule { + config: config.clone(), + enable: true, + }), + ); + + validation_rules.insert( + ValidationRuleType::ToBase, + Box::new(BaseCompactionTaskValidationRule { + config, + enable: true, + }), + ); + + CompactionTaskValidator { validation_rules } + } + + pub fn valid_compact_task( + &self, + input: &CompactionInput, + picker_type: ValidationRuleType, + stats: &mut LocalPickerStatistic, + ) -> bool { + self.validation_rules + .get(&picker_type) + .unwrap() + .validate(input, stats) + } +} + +pub trait CompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool; +} + +struct TierCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for TierCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + // so the design here wants to merge multiple overlapping-levels in one compaction + let max_compaction_bytes = std::cmp::min( + self.config.max_compaction_bytes, + self.config.sub_level_max_compaction_bytes + * self.config.level0_overlapping_sub_level_compact_level_count as u64, + ); + + // Limit sstable file count to avoid using too much memory. + let overlapping_max_compact_file_numer = std::cmp::min( + self.config.level0_max_compact_file_number, + MAX_COMPACT_LEVEL_COUNT as u64, + ); + + let waiting_enough_files = { + if input.select_input_size > max_compaction_bytes { + false + } else { + input.total_file_count <= overlapping_max_compact_file_numer + } + }; + + // If waiting_enough_files is not satisfied, we will raise the priority of the number of + // levels to ensure that we can merge as many sub_levels as possible + let tier_sub_level_compact_level_count = + self.config.level0_overlapping_sub_level_compact_level_count as usize; + if input.input_levels.len() < tier_sub_level_compact_level_count && waiting_enough_files { + stats.skip_by_count_limit += 1; + return false; + } + + true + } +} + +struct IntraCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + let intra_sub_level_compact_level_count = + self.config.level0_sub_level_compact_level_count as usize; + + if input.input_levels.len() < intra_sub_level_compact_level_count { + return false; + } + + let mut max_level_size = 0; + for select_level in &input.input_levels { + let level_select_size = select_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum::(); + + max_level_size = std::cmp::max(max_level_size, level_select_size); + } + + // This limitation would keep our write-amplification no more than + // ln(max_compaction_bytes/flush_level_bytes) / + // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half + // of level0_sub_level_compact_level_count just for convenient. + let is_write_amp_large = + max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 + >= input.select_input_size; + + if is_write_amp_large && input.total_file_count < self.config.level0_max_compact_file_number + { + stats.skip_by_write_amp_limit += 1; + return false; + } + + if input.input_levels.len() < intra_sub_level_compact_level_count + && input.total_file_count < self.config.level0_max_compact_file_number + { + stats.skip_by_count_limit += 1; + return false; + } + + true + } +} + +struct BaseCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + // The size of target level may be too large, we shall skip this compact task and wait + // the data in base level compact to lower level. + if input.target_input_size > self.config.max_compaction_bytes { + stats.skip_by_count_limit += 1; + return false; + } + + if input.select_input_size < input.target_input_size { + stats.skip_by_write_amp_limit += 1; + return false; + } + + true + } +} diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index a6942b2e4d680..e8f8c908d0fd3 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -101,6 +101,7 @@ impl ManualCompactionPicker { input_levels, target_level: 0, target_sub_level_id: sub_level_id, + ..Default::default() }) } @@ -170,6 +171,7 @@ impl ManualCompactionPicker { input_levels, target_level: self.target_level, target_sub_level_id: 0, + ..Default::default() }) } @@ -301,6 +303,9 @@ impl CompactionPicker for ManualCompactionPicker { } Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64, input_levels: vec![ InputLevel { level_idx: level as u32, @@ -314,7 +319,7 @@ impl CompactionPicker for ManualCompactionPicker { }, ], target_level, - target_sub_level_id: 0, + ..Default::default() }) } } diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index b489ec37987b8..efacf94d7dd93 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -20,10 +20,9 @@ use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{InputLevel, Level, LevelType, SstableInfo}; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; +use super::{CompactionInput, CompactionPicker, LocalPickerStatistic, MAX_COMPACT_LEVEL_COUNT}; use crate::hummock::compaction::overlap_strategy::OverlapStrategy; use crate::hummock::level_handler::LevelHandler; -pub const MAX_LEVEL_COUNT: usize = 42; pub struct MinOverlappingPicker { level: usize, @@ -130,6 +129,9 @@ impl CompactionPicker for MinOverlappingPicker { return None; } Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64, input_levels: vec![ InputLevel { level_idx: self.level as u32, @@ -143,7 +145,7 @@ impl CompactionPicker for MinOverlappingPicker { }, ], target_level: self.target_level, - target_sub_level_id: 0, + ..Default::default() }) } } @@ -299,7 +301,7 @@ impl NonOverlapSubLevelPicker { .iter() .filter(|ssts| !ssts.is_empty()) .count() - > MAX_LEVEL_COUNT + > MAX_COMPACT_LEVEL_COUNT { break; } diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 86f3736288be6..04e0550b8413c 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -21,7 +21,10 @@ mod tombstone_reclaim_compaction_picker; mod trivial_move_compaction_picker; mod ttl_reclaim_compaction_picker; +mod compaction_task_validator; + pub use base_level_compaction_picker::LevelCompactionPicker; +pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType}; pub use manual_compaction_picker::ManualCompactionPicker; pub use min_overlap_compaction_picker::MinOverlappingPicker; use risingwave_pb::hummock::hummock_version::Levels; @@ -36,6 +39,8 @@ pub use ttl_reclaim_compaction_picker::{TtlPickerState, TtlReclaimCompactionPick use crate::hummock::level_handler::LevelHandler; +pub const MAX_COMPACT_LEVEL_COUNT: usize = 42; + #[derive(Default)] pub struct LocalPickerStatistic { pub skip_by_write_amp_limit: u64, @@ -43,10 +48,15 @@ pub struct LocalPickerStatistic { pub skip_by_pending_files: u64, pub skip_by_overlapping: u64, } + +#[derive(Default)] pub struct CompactionInput { pub input_levels: Vec, pub target_level: usize, pub target_sub_level_id: u64, + pub select_input_size: u64, + pub target_input_size: u64, + pub total_file_count: u64, } impl CompactionInput { diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 4371729db8d9f..95fd5d2d52353 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -80,6 +80,8 @@ impl SpaceReclaimCompactionPicker { } if !select_input_ssts.is_empty() { return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { level_idx: level.level_idx, @@ -94,6 +96,7 @@ impl SpaceReclaimCompactionPicker { ], target_level: level.level_idx as usize, target_sub_level_id: level.sub_level_id, + ..Default::default() }); } } @@ -135,6 +138,8 @@ impl SpaceReclaimCompactionPicker { // turn to next_round if !select_input_ssts.is_empty() { return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { level_idx: state.last_level as u32, @@ -148,7 +153,7 @@ impl SpaceReclaimCompactionPicker { }, ], target_level: state.last_level, - target_sub_level_id: 0, + ..Default::default() }); } state.last_level += 1; diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index 6ebb2fed50364..99b17694f528e 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -19,17 +19,35 @@ use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, InputLevel, LevelType, OverlappingLevel}; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; -use crate::hummock::compaction::picker::min_overlap_compaction_picker::MAX_LEVEL_COUNT; +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic, + ValidationRuleType, +}; +use crate::hummock::compaction::picker::MAX_COMPACT_LEVEL_COUNT; use crate::hummock::level_handler::LevelHandler; pub struct TierCompactionPicker { config: Arc, + compaction_task_validator: Arc, } impl TierCompactionPicker { + #[cfg(test)] pub fn new(config: Arc) -> TierCompactionPicker { - TierCompactionPicker { config } + TierCompactionPicker { + compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), + config, + } + } + + pub fn new_with_validator( + config: Arc, + compaction_task_validator: Arc, + ) -> TierCompactionPicker { + TierCompactionPicker { + config, + compaction_task_validator, + } } fn pick_overlapping_level( @@ -66,9 +84,16 @@ impl TierCompactionPicker { if can_concat(&input_level.table_infos) { return Some(CompactionInput { + select_input_size: input_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum(), + total_file_count: input_level.table_infos.len() as u64, input_levels: vec![input_level], target_level: 0, target_sub_level_id: level.sub_level_id, + ..Default::default() }); } @@ -87,29 +112,15 @@ impl TierCompactionPicker { // Limit sstable file count to avoid using too much memory. let overlapping_max_compact_file_numer = std::cmp::min( self.config.level0_max_compact_file_number, - MAX_LEVEL_COUNT as u64, + MAX_COMPACT_LEVEL_COUNT as u64, ); - let mut waiting_enough_files = { - if compaction_bytes > max_compaction_bytes { - false - } else { - compact_file_count <= overlapping_max_compact_file_numer - } - }; for other in &l0.sub_levels[idx + 1..] { if compaction_bytes > max_compaction_bytes { - waiting_enough_files = false; break; } if compact_file_count > overlapping_max_compact_file_numer { - waiting_enough_files = false; - break; - } - - if other.level_type() != LevelType::Overlapping { - waiting_enough_files = false; break; } @@ -126,24 +137,26 @@ impl TierCompactionPicker { }); } - // If waiting_enough_files is not satisfied, we will raise the priority of the number of - // levels to ensure that we can merge as many sub_levels as possible - let tier_sub_level_compact_level_count = - self.config.level0_overlapping_sub_level_compact_level_count as usize; - if select_level_inputs.len() < tier_sub_level_compact_level_count - && waiting_enough_files - { - stats.skip_by_count_limit += 1; - continue; - } - select_level_inputs.reverse(); - return Some(CompactionInput { + let result = CompactionInput { input_levels: select_level_inputs, target_level: 0, target_sub_level_id: level.sub_level_id, - }); + select_input_size: compaction_bytes, + target_input_size: 0, + total_file_count: compact_file_count, + }; + + if !self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Tier, + stats, + ) { + continue; + } + + return Some(result); } None } diff --git a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs index 97d8fa995d8b1..994bfbc5ea557 100644 --- a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs @@ -117,6 +117,14 @@ impl TombstoneReclaimCompactionPicker { } }; return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum(), + total_file_count: (select_input_ssts.len() + target_level.table_infos.len()) + as u64, target_level: target_level.level_idx as usize, input_levels: vec![ InputLevel { @@ -126,7 +134,7 @@ impl TombstoneReclaimCompactionPicker { }, target_level, ], - target_sub_level_id: 0, + ..Default::default() }); } state.last_level += 1; diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 89f794e04efb2..4bfbca0c5fb59 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -79,6 +79,8 @@ impl TrivialMovePicker { self.pick_trivial_move_sst(select_tables, target_tables, level_handlers, stats) { return Some(CompactionInput { + select_input_size: trivial_move_sst.file_size, + total_file_count: 1, input_levels: vec![ InputLevel { level_idx: self.level as u32, @@ -92,7 +94,7 @@ impl TrivialMovePicker { }, ], target_level: self.target_level, - target_sub_level_id: 0, + ..Default::default() }); } diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index df833f7a14c05..a822d33db3cfa 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -199,6 +199,8 @@ impl TtlReclaimCompactionPicker { }); Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as _, input_levels: vec![ InputLevel { level_idx: reclaimed_level.level_idx, @@ -212,7 +214,7 @@ impl TtlReclaimCompactionPicker { }, ], target_level: reclaimed_level.level_idx as usize, - target_sub_level_id: 0, + ..Default::default() }) } } diff --git a/src/meta/src/model/barrier.rs b/src/meta/src/model/barrier.rs index a2315c9aa7ed8..e146dd47489ec 100644 --- a/src/meta/src/model/barrier.rs +++ b/src/meta/src/model/barrier.rs @@ -12,17 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::barrier::TracedEpoch; +use risingwave_pb::meta::PausedReason; -/// The reason why the data sources in the cluster are paused. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PausedReason { - /// The cluster is paused due to configuration change, e.g. altering table schema and scaling. - ConfigChange, - /// The cluster is paused due to manual operation, e.g. `risectl` command or the - /// `pause_on_next_bootstrap` system variable. - Manual, -} +use crate::barrier::TracedEpoch; /// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`. pub struct BarrierManagerState { diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 14994867f980e..e0e0472109286 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -539,7 +539,6 @@ pub async fn start_service_as_election_leader( let user_srv = UserServiceImpl::new(env.clone(), catalog_manager.clone()); let scale_srv = ScaleServiceImpl::new( - barrier_scheduler.clone(), fragment_manager.clone(), cluster_manager.clone(), source_manager, diff --git a/src/meta/src/rpc/service/scale_service.rs b/src/meta/src/rpc/service/scale_service.rs index bb5752ff9137c..f231ea5f4955d 100644 --- a/src/meta/src/rpc/service/scale_service.rs +++ b/src/meta/src/rpc/service/scale_service.rs @@ -16,21 +16,19 @@ use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; use risingwave_pb::meta::{ GetClusterInfoRequest, GetClusterInfoResponse, GetReschedulePlanRequest, - GetReschedulePlanResponse, PauseRequest, PauseResponse, Reschedule, RescheduleRequest, - RescheduleResponse, ResumeRequest, ResumeResponse, + GetReschedulePlanResponse, Reschedule, RescheduleRequest, RescheduleResponse, }; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use tonic::{Request, Response, Status}; -use crate::barrier::{BarrierManagerRef, BarrierScheduler, Command}; +use crate::barrier::BarrierManagerRef; use crate::manager::{CatalogManagerRef, ClusterManagerRef, FragmentManagerRef}; -use crate::model::{MetadataModel, PausedReason}; +use crate::model::MetadataModel; use crate::stream::{ GlobalStreamManagerRef, ParallelUnitReschedule, RescheduleOptions, SourceManagerRef, }; pub struct ScaleServiceImpl { - barrier_scheduler: BarrierScheduler, fragment_manager: FragmentManagerRef, cluster_manager: ClusterManagerRef, source_manager: SourceManagerRef, @@ -41,7 +39,6 @@ pub struct ScaleServiceImpl { impl ScaleServiceImpl { pub fn new( - barrier_scheduler: BarrierScheduler, fragment_manager: FragmentManagerRef, cluster_manager: ClusterManagerRef, source_manager: SourceManagerRef, @@ -50,7 +47,6 @@ impl ScaleServiceImpl { barrier_manager: BarrierManagerRef, ) -> Self { Self { - barrier_scheduler, fragment_manager, cluster_manager, source_manager, @@ -63,26 +59,6 @@ impl ScaleServiceImpl { #[async_trait::async_trait] impl ScaleService for ScaleServiceImpl { - #[cfg_attr(coverage, no_coverage)] - async fn pause(&self, _: Request) -> Result, Status> { - // TODO: move this out of the scale service, as scaling actually executes `pause` and - // `resume` with `PausedReason::ConfigChange`. - self.barrier_scheduler - .run_command(Command::pause(PausedReason::Manual)) - .await?; - Ok(Response::new(PauseResponse {})) - } - - #[cfg_attr(coverage, no_coverage)] - async fn resume(&self, _: Request) -> Result, Status> { - // TODO: move this out of the scale service, as scaling actually executes `pause` and - // `resume` with `PausedReason::ConfigChange`. - self.barrier_scheduler - .run_command(Command::resume(PausedReason::Manual)) - .await?; - Ok(Response::new(ResumeResponse {})) - } - #[cfg_attr(coverage, no_coverage)] async fn get_cluster_info( &self, diff --git a/src/meta/src/rpc/service/stream_service.rs b/src/meta/src/rpc/service/stream_service.rs index 0125b53d84a17..b2ed1ec916b08 100644 --- a/src/meta/src/rpc/service/stream_service.rs +++ b/src/meta/src/rpc/service/stream_service.rs @@ -24,7 +24,7 @@ use risingwave_pb::meta::stream_manager_service_server::StreamManagerService; use risingwave_pb::meta::*; use tonic::{Request, Response, Status}; -use crate::barrier::BarrierScheduler; +use crate::barrier::{BarrierScheduler, Command}; use crate::manager::{CatalogManagerRef, FragmentManagerRef, MetaSrvEnv}; use crate::stream::GlobalStreamManagerRef; @@ -71,6 +71,30 @@ impl StreamManagerService for StreamServiceImpl { })) } + #[cfg_attr(coverage, no_coverage)] + async fn pause(&self, _: Request) -> Result, Status> { + let i = self + .barrier_scheduler + .run_command(Command::pause(PausedReason::Manual)) + .await?; + Ok(Response::new(PauseResponse { + prev: i.prev_paused_reason.map(Into::into), + curr: i.curr_paused_reason.map(Into::into), + })) + } + + #[cfg_attr(coverage, no_coverage)] + async fn resume(&self, _: Request) -> Result, Status> { + let i = self + .barrier_scheduler + .run_command(Command::resume(PausedReason::Manual)) + .await?; + Ok(Response::new(ResumeResponse { + prev: i.prev_paused_reason.map(Into::into), + curr: i.curr_paused_reason.map(Into::into), + })) + } + async fn cancel_creating_jobs( &self, request: Request, diff --git a/src/prost/build.rs b/src/prost/build.rs index a0d3f64c9a455..172f9c0731a6d 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -74,6 +74,10 @@ fn main() -> Result<(), Box> { .type_attribute(".", "#[derive(prost_helpers::AnyPB)]") .type_attribute("node_body", "#[derive(::enum_as_inner::EnumAsInner)]") .type_attribute("rex_node", "#[derive(::enum_as_inner::EnumAsInner)]") + .type_attribute( + "meta.PausedReason", + "#[derive(::enum_as_inner::EnumAsInner)]", + ) .type_attribute( "stream_plan.Barrier.BarrierKind", "#[derive(::enum_as_inner::EnumAsInner)]", diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 46ec4671556e6..3179fd500e598 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -735,16 +735,16 @@ impl MetaClient { Ok(resp.states) } - pub async fn pause(&self) -> Result<()> { + pub async fn pause(&self) -> Result { let request = PauseRequest {}; - let _resp = self.inner.pause(request).await?; - Ok(()) + let resp = self.inner.pause(request).await?; + Ok(resp) } - pub async fn resume(&self) -> Result<()> { + pub async fn resume(&self) -> Result { let request = ResumeRequest {}; - let _resp = self.inner.resume(request).await?; - Ok(()) + let resp = self.inner.resume(request).await?; + Ok(resp) } pub async fn get_cluster_info(&self) -> Result { @@ -1647,6 +1647,8 @@ macro_rules! for_all_meta_rpc { ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse } ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse } ,{ stream_client, flush, FlushRequest, FlushResponse } + ,{ stream_client, pause, PauseRequest, PauseResponse } + ,{ stream_client, resume, ResumeRequest, ResumeResponse } ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse } ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse } ,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse } @@ -1712,8 +1714,6 @@ macro_rules! for_all_meta_rpc { ,{ user_client, drop_user, DropUserRequest, DropUserResponse } ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse } ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse } - ,{ scale_client, pause, PauseRequest, PauseResponse } - ,{ scale_client, resume, ResumeRequest, ResumeResponse } ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse } ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse } ,{ scale_client, get_reschedule_plan, GetReschedulePlanRequest, GetReschedulePlanResponse } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 7e8bf9c1042a6..84dc34ada3b64 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -549,7 +549,7 @@ impl Parser { })); let token = self.next_token(); - let expr = match token.token { + let expr = match token.token.clone() { Token::Word(w) => match w.keyword { Keyword::TRUE | Keyword::FALSE | Keyword::NULL => { self.prev_token(); @@ -593,7 +593,7 @@ impl Parser { }) } k if keywords::RESERVED_FOR_COLUMN_OR_TABLE_NAME.contains(&k) => { - parser_err!(format!("syntax error at or near \"{w}\"")) + parser_err!(format!("syntax error at or near {token}")) } // Here `w` is a word, check if it's a part of a multi-part // identifier, a function call, or a simple identifier: @@ -1232,7 +1232,7 @@ impl Parser { // for keyword 'array' self.prev_token(); } - parser_err!(format!("syntax error at or near '{}'", self.peek_token()))? + parser_err!(format!("syntax error at or near {}", self.peek_token()))? } else { Ok(()) } @@ -3435,10 +3435,10 @@ impl Parser { /// Parse a simple one-word identifier (possibly quoted, possibly a non-reserved keyword) pub fn parse_identifier_non_reserved(&mut self) -> Result { let token = self.next_token(); - match token.token { + match token.token.clone() { Token::Word(w) => { match keywords::RESERVED_FOR_COLUMN_OR_TABLE_NAME.contains(&w.keyword) { - true => parser_err!(format!("syntax error at or near \"{w}\"")), + true => parser_err!(format!("syntax error at or near {token}")), false => Ok(w.to_ident()?), } } diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index ce3ec095ce283..0fc2f3c2530f7 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -253,7 +253,8 @@ fn parse_select_all() { #[test] fn parse_select_all_distinct() { let result = parse_sql_statements("SELECT ALL DISTINCT name FROM customer"); - assert!(format!("{}", result.unwrap_err()).contains("syntax error at or near \"DISTINCT\"")); + assert!(format!("{}", result.unwrap_err()) + .contains("syntax error at or near DISTINCT at line:1, column:20")); } #[test] diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 4e45f93a8c8ad..d94f1b06b166b 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -1047,7 +1047,7 @@ fn parse_array() { assert_eq!( parse_sql_statements(sql), Err(ParserError::ParserError( - "syntax error at or near '[ at line:1, column:28'".to_string() + "syntax error at or near [ at line:1, column:28".to_string() )) ); @@ -1055,7 +1055,7 @@ fn parse_array() { assert_eq!( parse_sql_statements(sql), Err(ParserError::ParserError( - "syntax error at or near '[ at line:1, column:24'".to_string() + "syntax error at or near [ at line:1, column:24".to_string() )) ); @@ -1063,7 +1063,7 @@ fn parse_array() { assert_eq!( parse_sql_statements(sql), Err(ParserError::ParserError( - "syntax error at or near 'ARRAY at line:1, column:27'".to_string() + "syntax error at or near ARRAY at line:1, column:27".to_string() )) ); @@ -1071,7 +1071,7 @@ fn parse_array() { assert_eq!( parse_sql_statements(sql), Err(ParserError::ParserError( - "syntax error at or near 'ARRAY at line:1, column:23'".to_string() + "syntax error at or near ARRAY at line:1, column:23".to_string() )) ); diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index ab5822fc2bc09..92bdabc83048c 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -35,7 +35,7 @@ - input: CREATE TABLE T (a STRUCT) formatted_sql: CREATE TABLE T (a STRUCT) - input: CREATE TABLE T (FULL INT) - error_msg: 'sql parser error: syntax error at or near "FULL"' + error_msg: 'sql parser error: syntax error at or near FULL at line:1, column:21' - input: CREATE TABLE T ("FULL" INT) formatted_sql: CREATE TABLE T ("FULL" INT) - input: CREATE USER user WITH SUPERUSER CREATEDB PASSWORD 'password' diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index b98b9b6ff4fb2..bbbb5a72bbdab 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -71,9 +71,9 @@ sql parser error: Expected ), found: minutes at line:1, column:62 Near "(t, x, interval '10'" - input: SELECT 1, FROM t - error_msg: 'sql parser error: syntax error at or near "FROM"' + error_msg: 'sql parser error: syntax error at or near FROM at line:1, column:15' - input: SELECT 1, WHERE true - error_msg: 'sql parser error: syntax error at or near "WHERE"' + error_msg: 'sql parser error: syntax error at or near WHERE at line:1, column:16' - input: SELECT timestamp with time zone '2022-10-01 12:00:00Z' AT TIME ZONE 'US/Pacific' formatted_sql: SELECT TIMESTAMP WITH TIME ZONE '2022-10-01 12:00:00Z' AT TIME ZONE 'US/Pacific' formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(AtTimeZone { timestamp: TypedString { data_type: Timestamp(true), value: "2022-10-01 12:00:00Z" }, time_zone: "US/Pacific" })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index e85ecffedeccb..2a4cd4bf6ad78 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -141,19 +141,19 @@ impl SortBuffer { watermark: ScalarImpl, buffer_table: &'a mut StateTable, ) { - let mut last_timestamp = None; + let mut last_table_pk = None; loop { if !self.cache.is_synced() { // Refill the cache, then consume from the cache, to ensure strong row ordering // and prefetch for the next watermark. - self.refill_cache(last_timestamp.take(), buffer_table) + self.refill_cache(last_table_pk.take(), buffer_table) .await?; } #[for_await] for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) { - let ((timestamp_val, _), row) = res?; - last_timestamp = Some(timestamp_val.into_inner()); + let row = res?; + last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row()); yield row; } @@ -169,7 +169,7 @@ impl SortBuffer { buffer_table.update_watermark(watermark, true); } - #[try_stream(ok = (CacheKey, OwnedRow), error = StreamExecutorError)] + #[try_stream(ok = OwnedRow, error = StreamExecutorError)] async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) { while self.cache.is_synced() { let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else { @@ -177,7 +177,7 @@ impl SortBuffer { }; if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() { let row = self.cache.delete(&key).unwrap(); - yield (key, row); + yield row; } else { break; } @@ -187,15 +187,14 @@ impl SortBuffer { /// Clear the cache and refill it with the current content of the buffer table. pub async fn refill_cache( &mut self, - last_timestamp: Option, + last_table_pk: Option, buffer_table: &StateTable, ) -> StreamExecutorResult<()> { let mut filler = self.cache.begin_syncing(); let pk_range = ( - last_timestamp - .as_ref() - .map(|v| Bound::Excluded([Some(v.as_scalar_ref_impl())])) + last_table_pk + .map(Bound::Excluded) .unwrap_or(Bound::Unbounded), Bound::::Unbounded, ); diff --git a/src/tests/e2e_extended_mode/src/test.rs b/src/tests/e2e_extended_mode/src/test.rs index b8058105eb52f..46f6bff8aec05 100644 --- a/src/tests/e2e_extended_mode/src/test.rs +++ b/src/tests/e2e_extended_mode/src/test.rs @@ -187,7 +187,7 @@ impl TestSuite { ); } - let timestamptz = DateTime::::from_utc( + let timestamptz = DateTime::::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2022, 1, 1) .unwrap() .and_hms_opt(10, 0, 0)