From 57733ab06be5c5a140258c5c7393bcc1a4024306 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 22 Apr 2024 15:00:02 +0800 Subject: [PATCH] update tests Signed-off-by: xxchan --- Makefile.toml | 40 +++- e2e_test/source_inline/commands.toml | 35 ++-- .../source_inline/kafka/consumer_group.mjs | 13 +- .../source_inline/kafka/internal_table.mjs | 65 ++++++ .../source_inline/kafka/shared_source.slt | 188 +++++++++++++++--- src/common/src/catalog/internal_table.rs | 4 +- 6 files changed, 286 insertions(+), 59 deletions(-) create mode 100755 e2e_test/source_inline/kafka/internal_table.mjs diff --git a/Makefile.toml b/Makefile.toml index 6db1dbb604b12..91cfe6f9b760d 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -131,8 +131,7 @@ rm -rf "${PREFIX_PROFILING}" [tasks.reset-rw] category = "RiseDev - Start/Stop" description = "Clean all data in the default database dev of the running RisingWave" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c "CREATE DATABASE risedev_tmp;" @@ -568,11 +567,16 @@ if [ ! -f "${RC_ENV_FILE}" ]; then fi ''' +[tasks.check-and-load-risedev-env-file] +private = true +category = "RiseDev - Prepare" +dependencies = ["check-risedev-env-file"] +env_files = ["${PREFIX_CONFIG}/risedev-env"] + [tasks.psql-env] category = "RiseDev - Start/Stop" description = "Dump env configuration for psql" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash cat < "${PREFIX_CONFIG}/psql-env" @@ -590,8 +594,7 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)" [tasks.psql] category = "RiseDev - Start/Stop" description = "Run local psql client with default connection parameters. You can pass extra arguments to psql." -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@" @@ -600,8 +603,7 @@ psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root [tasks.ctl] category = "RiseDev - Start/Stop" description = "Start RiseCtl" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash cargo run -p risingwave_cmd_all --profile "${RISINGWAVE_BUILD_PROFILE}" -- ctl "$@" @@ -1313,12 +1315,18 @@ category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] command = "sqllogictest" args = ["${@}"] description = "🌟 Run SQLLogicTest" +[tasks.slt-clean] +category = "RiseDev - Test - SQLLogicTest" +dependencies = ["clean-kafka", "reset-rw"] +description = "Run SQLLogicTest with a clean environment" +run_task = "slt" +args = ["${@}"] + [tasks.slt-streaming] category = "RiseDev - Test - SQLLogicTest" extend = "slt" @@ -1433,3 +1441,15 @@ script = """ vars = dump_variables echo ${vars} """ + +[tasks.show-risedev-env] +description = "Show risedev-env environment variables" +dependencies = ["check-risedev-env-file"] +script = ''' +#!/usr/bin/env bash +set -euo pipefail + +cat ${PREFIX_CONFIG}/risedev-env +echo "Hint: To load the environment variables into the shell, you may run:" +echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)" +''' diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 8af865099ac7c..9eb7d7b4db4f8 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -1,9 +1,5 @@ # This file contains commands used by the tests. -[tasks.source-test-hook] -private = true -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] # Note about the Kafka CLI tooling: # - Built-in Kafka console tools: @@ -16,10 +12,10 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"] # - rpk: # Golang based. # Style example: RPK_BROKERS=localhost:9092 rpk topic create t -[tasks.kafka-hook] +[tasks.check-kafka] private = true description = "Check if Kafka is started by RiseDev" -dependencies = ["source-test-hook"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env sh set -e @@ -38,13 +34,22 @@ fi [tasks.clean-kafka] category = "RiseDev - Test - Source Test - Kafka" description = "Delete all kafka topics." -dependencies = ["kafka-hook"] -command = "rpk" -args = ["topic", "delete", "-r", "*"] +dependencies = ["check-and-load-risedev-env-file"] +script = ''' +#!/usr/bin/env sh +set -e + +if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "Deleting all Kafka topics..." + rpk topic delete -r "*" +else + echo "No Kafka to clean." +fi +''' [tasks.kafka-topics] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -53,7 +58,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTS [tasks.kafka-produce] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -62,7 +67,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_K [tasks.kafka-consume] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -71,7 +76,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_K [tasks.kafka-consumer-groups] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -81,7 +86,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KA # rpk tools [tasks.rpk] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] # check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options script = """ #!/usr/bin/env sh @@ -98,7 +103,7 @@ rpk "$@" [tasks.redpanda-console] category = "RiseDev - Test - Source Test - Kafka" description = "Start Redpanda console (Kafka GUI) at localhost:8080." -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = ''' #!/usr/bin/env sh set -e diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs index 12e906631319a..1766a903b09f5 100755 --- a/e2e_test/source_inline/kafka/consumer_group.mjs +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -3,11 +3,8 @@ // zx: A tool for writing better scripts // https://google.github.io/zx/ -const { - mv: mv, - _: _command, -} = minimist(process.argv.slice(3), { - string: ["mv", "topic"], +const { mv: mv, _: _command } = minimist(process.argv.slice(3), { + string: ["mv"], _: ["list-members", "list-lags"], }); const command = _command[0]; @@ -62,7 +59,7 @@ async function list_consumer_group_members(fragment_id) { const groups = await list_consumer_groups(fragment_id); return Promise.all( groups.map(async (group_name) => { - return (await describe_consumer_group(group_name))["MEMBERS"] + return (await describe_consumer_group(group_name))["MEMBERS"]; }) ); } @@ -71,14 +68,14 @@ async function list_consumer_group_lags(fragment_id) { const groups = await list_consumer_groups(fragment_id); return Promise.all( groups.map(async (group_name) => { - return (await describe_consumer_group(group_name))["TOTAL-LAG"] + return (await describe_consumer_group(group_name))["TOTAL-LAG"]; }) ); } const fragment_id = await get_fragment_id_of_mv(mv); if (command == "list-groups") { - echo`${(await list_consumer_groups(fragment_id))}`; + echo`${await list_consumer_groups(fragment_id)}`; } else if (command == "list-members") { echo`${await list_consumer_group_members(fragment_id)}`; } else if (command == "list-lags") { diff --git a/e2e_test/source_inline/kafka/internal_table.mjs b/e2e_test/source_inline/kafka/internal_table.mjs new file mode 100755 index 0000000000000..d5b052b9722f4 --- /dev/null +++ b/e2e_test/source_inline/kafka/internal_table.mjs @@ -0,0 +1,65 @@ +#!/usr/bin/env zx + +// zx: A tool for writing better scripts +// https://google.github.io/zx/ + +const { + name: job_name, + type: table_type, + _: _command, +} = minimist(process.argv.slice(3), { + string: ["name", "type"], +}); + +// Return an array of CSV string +async function psql(query) { + return ( + await $` +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \ +--csv --tuples-only -c ${query} +` + ) + .toString() + .trim() + .split("\n") + .filter((line) => line.trim() != ""); +} + +// If `table_type` is null, return all internal tables for the job. +// If `job_name` is null, return all jobs' internal tables. +async function select_internal_table(job_name, table_type) { + // Note: if we have `t1`, and `t1_balabala`, the latter one will also be matched 😄. + const internal_tables = await psql( + `select name from rw_internal_tables where name like '__internal_${job_name}_%_${table_type}_%'` + ); + if (internal_tables.length == 0) { + throw new Error( + `No internal tables found for the pattern '__internal_${job_name}_%_${table_type}_%'` + ); + } + + const res = new Map( + await Promise.all( + internal_tables.map(async (t) => { + let rows = await psql(`select * from ${t}`); + if (rows.length == 0) { + rows = ["(empty)"]; + } + return [t, rows]; + }) + ) + ); + return res; +} + +const tables = await select_internal_table(job_name, table_type); +if (tables.size > 1) { + for (const [table_name, rows] of tables) { + console.log(`Table: ${table_name}`); + console.log(`${rows.join("\n")}`); + } +} else { + for (const [_table_name, rows] of tables) { + console.log(`${rows.join("\n")}`); + } +} diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt index 0bf8944306709..7bb6ce64c72cf 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -8,10 +8,11 @@ rpk topic create shared_source -p 4 system ok cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0 -0 {"v1": 1, "v2": "1"} -1 {"v1": 2, "v2": "22"} -2 {"v1": 3, "v2": "333"} -3 {"v1": 4, "v2": "4444"} +0 {"v1": 1, "v2": "a"} +1 {"v1": 2, "v2": "b"} +2 {"v1": 3, "v2": "c"} +3 {"v1": 4, "v2": "d"} +EOF statement ok create source s0 (v1 int, v2 varchar) with ( @@ -20,9 +21,50 @@ create source s0 (v1 int, v2 varchar) with ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON; +query I +select count(*) from rw_internal_tables where name like '%s0%'; +---- +1 + +sleep 1s + +# Ingestion does not start, even after sleep +system ok +./e2e_test/source_inline/kafka/internal_table.mjs --name s0 --type source +---- +(empty) + + statement ok create materialized view mv_1 as select * from s0; +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 1s + +# Ingestion started +system ok +./e2e_test/source_inline/kafka/internal_table.mjs --name s0 --type source +---- +0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" + + +# The result is non-deterministic: +# If the upstream row comes before the backfill row, it will be ignored, and the result state is Backfilling. +# If the upstream row comes after the backfill row, the result state is Finished. +# Uncomment below and run manually to see the result. + +# system ok +# ./e2e_test/source_inline/kafka/internal_table.mjs --name mv_1 --type sourcebackfill +# ---- +# 0,"{""Backfilling"": ""0""}" +# 1,"{""Backfilling"": ""0""}" +# 2,"{""Backfilling"": ""0""}" +# 3,"{""Backfilling"": ""0""}" + + # This does not the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. statement ok SET rw_enable_shared_source TO false; @@ -30,39 +72,137 @@ SET rw_enable_shared_source TO false; statement ok create materialized view mv_2 as select * from s0; -statement ok -flush; - -# Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 1s query IT rowsort select v1, v2 from s0; ---- -1 1 -2 22 -3 333 -4 4444 +1 a +2 b +3 c +4 d query IT rowsort select v1, v2 from mv_1; ---- -1 1 -2 22 -3 333 -4 4444 +1 a +2 b +3 c +4 d query IT rowsort select v1, v2 from mv_2; ---- -1 1 -2 22 -3 333 -4 4444 - -# TODO: add more data to the topic and re-check the data. Currently there's no good test infra to do this... -# To test the correctness of source backfill, we might need to keep producing data during an interval, to let it go -# through the backfill stage to the forward stage. +1 a +2 b +3 c +4 d + +system ok +cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0 +0 {"v1": 1, "v2": "aa"} +1 {"v1": 2, "v2": "bb"} +2 {"v1": 3, "v2": "cc"} +3 {"v1": 4, "v2": "dd"} +EOF + +sleep 1s + +query IT rowsort +select v1, v2 from s0; +---- +1 a +1 aa +2 b +2 bb +3 c +3 cc +4 d +4 dd + +query IT rowsort +select v1, v2 from mv_1; +---- +1 a +1 aa +2 b +2 bb +3 c +3 cc +4 d +4 dd + + +# start_offset changed to 1 +system ok +./e2e_test/source_inline/kafka/internal_table.mjs --name s0 --type source +---- +0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" + + +# Same as above, the result is still non-deterministic: Some partitions may be: "{""Backfilling"": ""1""}" +# Uncomment below and run manually to see the result. + +# system ok +# ./e2e_test/source_inline/kafka/internal_table.mjs --name mv_1 --type sourcebackfill +# ---- +# 0,"{""Finished""}" +# 1,"{""Finished""}" +# 2,"{""Finished""}" +# 3,"{""Finished""}" + + +system ok +for i in {0..9}; do +cat < String { format!( "__internal_{}_{}_{}_{}", - mview_name, + job_name, fragment_id, table_type.to_lowercase(), table_id