diff --git a/Makefile.toml b/Makefile.toml index 504ff88a33d5..9f896adc00b0 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1308,7 +1308,6 @@ echo "All processes has exited." """ [tasks.slt] -env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev" } category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", @@ -1319,6 +1318,19 @@ command = "sqllogictest" args = ["${@}"] description = "🌟 Run SQLLogicTest" +[tasks.slt.env] +SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}" +SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}" +SLT_DB = "dev" +PATH = "${PWD}/e2e_test/commands:${PATH}" + +[tasks.slt-clean] +category = "RiseDev - Test - SQLLogicTest" +dependencies = ["clean-kafka", "reset-rw"] +description = "Run SQLLogicTest with a clean environment" +run_task = "slt" +args = ["${@}"] + [tasks.slt-streaming] category = "RiseDev - Test - SQLLogicTest" extend = "slt" @@ -1433,3 +1445,14 @@ script = """ vars = dump_variables echo ${vars} """ + +[tasks.show-risedev-env] +description = "Show risedev-env environment variables" +dependencies = ["check-risedev-env-file"] +script = ''' +#!/usr/bin/env bash +set -euo pipefail +cat ${PREFIX_CONFIG}/risedev-env +echo "Hint: To load the environment variables into the shell, you may run:" +echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)" +''' diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8a683f56b855..5b776d91bd35 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -31,6 +31,12 @@ buildkite-agent artifact download risingwave-connector.tar.gz ./ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node +echo "--- e2e, inline test" +risedev ci-start ci-inline-source-test +risedev slt './e2e_test/source_inline/**/*.slt' +echo "--- Kill cluster" +risedev ci-kill + echo "--- Prepare data" cp src/connector/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc cp src/connector/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc @@ -82,15 +88,6 @@ echo "--- cdc share source test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 risedev slt './e2e_test/source/cdc/cdc.share_stream.slt' -# create a share source and check whether heartbeat message is received -risedev slt './e2e_test/source/cdc/cdc.create_source_job.slt' -table_id=$(psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs); -table_count=$(psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs); -if [ "$table_count" -eq 0 ]; then - echo "ERROR: internal table of cdc share source is empty!" - exit 1 -fi - echo "--- mysql & postgres load and check" risedev slt './e2e_test/source/cdc/cdc.load.slt' # wait for cdc loading @@ -168,9 +165,6 @@ echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' -echo "--- e2e, inline test" -risedev slt './e2e_test/source_inline/**/*.slt' - echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/e2e_test/commands/README.md b/e2e_test/commands/README.md new file mode 100644 index 000000000000..56f51159d844 --- /dev/null +++ b/e2e_test/commands/README.md @@ -0,0 +1,8 @@ +# "built-in" Sqllogictest system commands + +Scripts (executables) in this directory are expected to be used as `system` commands in sqllogictests. You can use any language like bash, python, zx. + +They will be loaded in `PATH` by `risedev slt`, and thus function as kind of "built-in" commands. + +Only general commands should be put here. +If the script is ad-hoc (only used for one test), it's better to put next to the test file. diff --git a/e2e_test/commands/internal_table.mjs b/e2e_test/commands/internal_table.mjs new file mode 100755 index 000000000000..b51cc0313dca --- /dev/null +++ b/e2e_test/commands/internal_table.mjs @@ -0,0 +1,67 @@ +#!/usr/bin/env zx + +// zx: A tool for writing better scripts +// https://google.github.io/zx/ + +const { + name: job_name, + type: table_type, + count: count, +} = minimist(process.argv.slice(3), { + string: ["name", "type"], + boolean: ["count"], +}); + +// Return an array of CSV string +async function psql(query) { + return ( + await $` +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \ +--csv --tuples-only -c ${query} +` + ) + .toString() + .trim() + .split("\n") + .filter((line) => line.trim() != ""); +} + +// If `table_type` is null, return all internal tables for the job. +// If `job_name` is null, return all jobs' internal tables. +async function select_internal_table(job_name, table_type) { + // Note: if we have `t1`, and `t1_balabala`, the latter one will also be matched 😄. + const internal_tables = await psql( + `select name from rw_internal_tables where name like '__internal_${job_name}_%_${table_type}_%'` + ); + if (internal_tables.length == 0) { + throw new Error( + `No internal tables found for the pattern '__internal_${job_name}_%_${table_type}_%'` + ); + } + + const res = new Map( + await Promise.all( + internal_tables.map(async (t) => { + let rows = await psql(`select * from ${t}`); + return [t, rows]; + }) + ) + ); + return res; +} + +const tables = await select_internal_table(job_name, table_type); +for (const [table_name, rows] of tables) { + if (tables.size > 1) { + console.log(`Table: ${table_name}`); + } + if (count) { + console.log(`count: ${rows.length}`); + } else { + if (rows.length == 0) { + console.log("(empty)"); + } else { + console.log(`${rows.join("\n")}`); + } + } +} diff --git a/e2e_test/source/cdc/cdc.create_source_job.slt b/e2e_test/source/cdc/cdc.create_source_job.slt deleted file mode 100644 index 1f2300087110..000000000000 --- a/e2e_test/source/cdc/cdc.create_source_job.slt +++ /dev/null @@ -1,15 +0,0 @@ -control substitution on - -# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` -statement ok -create source mysql_source with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'rwcdc', - password = '${MYSQL_PWD:}', - database.name = 'mytest', - server.id = '5001' -); - -sleep 2s diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 0ac47fc7fbd1..2c53e5774816 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -49,9 +49,11 @@ VALUES (1,1,'no'), (3,3,'no'), (4,4,'no'); +-- This user is for non-shared CDC CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +-- This user is for shared CDC CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'rwcdc'@'%'; diff --git a/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt b/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt deleted file mode 100644 index 59f7a4538ef2..000000000000 --- a/e2e_test/source/cdc_inline/mysql/mysql_create_drop.slt +++ /dev/null @@ -1,133 +0,0 @@ -# create and drop CDC mysql tables concurrently - -control substitution on - -statement ok -ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; - -system ok -mysql --protocol=tcp -u root -e " - DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; - USE testdb1; - CREATE TABLE tt1 (v1 int primary key, v2 timestamp); - INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); - CREATE TABLE tt2 (v1 int primary key, v2 timestamp); - INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); - CREATE TABLE tt3 (v1 int primary key, v2 timestamp); - INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); - CREATE TABLE tt4 (v1 int primary key, v2 timestamp); - INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); - CREATE TABLE tt5 (v1 int primary key, v2 timestamp); - INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00');" - -statement ok -create table tt1 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt1', -); - -statement ok -create table tt2 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt2', -); - -statement ok -create table tt3 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt3', -); - -statement ok -create table tt4 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt4', -); - -statement ok -create table tt5 (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) with ( - connector = 'mysql-cdc', - hostname = '${MYSQL_HOST:localhost}', - port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', - password = '${MYSQL_PWD:}', - database.name = 'testdb1', - table.name = 'tt5', -); - -sleep 5s - -query IT -select * from tt1; ----- -1 2023-10-23 10:00:00+00:00 - -query IT -select * from tt2; ----- -2 2023-10-23 11:00:00+00:00 - -query IT -select * from tt3; ----- -3 2023-10-23 12:00:00+00:00 - -query IT -select * from tt4; ----- -4 2023-10-23 13:00:00+00:00 - -query IT -select * from tt5; ----- -5 2023-10-23 14:00:00+00:00 - -statement ok -drop table tt1; - -statement ok -drop table tt2; - -statement ok -drop table tt3; - -statement ok -drop table tt4; - -statement ok -drop table tt5; diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index 3a9070639b8c..d1a84e22099e 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -26,5 +26,5 @@ risedev slt 'e2e_test/source_inline/**/*.slt' To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. Use `system` command to setup instead. -For simple cases, you can directly write a bash command; -For more complex cases, you can write a test script (with any language like bash, python, zx), and invoke it in the `system` command. +- For simple cases, you can directly write a bash command; +- For more complex cases, you can write a test script. See also [e2e_test/commands/README.md](../commands/README.md) diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt new file mode 100644 index 000000000000..e6ce86aaf06f --- /dev/null +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -0,0 +1,216 @@ +# create and drop CDC mysql tables concurrently + +control substitution on + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; + +system ok +mysql -e " + DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00'); + CREATE TABLE tt2 (v1 int primary key, v2 timestamp); + INSERT INTO tt2 VALUES (2, '2023-10-23 11:00:00'); + CREATE TABLE tt3 (v1 int primary key, v2 timestamp); + INSERT INTO tt3 VALUES (3, '2023-10-23 12:00:00'); + CREATE TABLE tt4 (v1 int primary key, v2 timestamp); + INSERT INTO tt4 VALUES (4, '2023-10-23 13:00:00'); + CREATE TABLE tt5 (v1 int primary key, v2 timestamp); + INSERT INTO tt5 VALUES (5, '2023-10-23 14:00:00'); +" + +system ok +mysql -e " + DROP USER IF EXISTS 'non-shared-cdc'@'%'; + CREATE USER 'non-shared-cdc'@'%' IDENTIFIED BY '123456'; + GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'non-shared-cdc'@'%'; + # + DROP USER IF EXISTS 'shared-cdc'@'%'; + CREATE USER 'shared-cdc'@'%' IDENTIFIED BY 'abcdef'; + GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'shared-cdc'@'%'; + # + FLUSH PRIVILEGES; +" + +statement ok +create source s with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'shared-cdc', + password = 'abcdef', + database.name = 'testdb1', + server.id = '114514' +); + +sleep 2s + +# SourceExecutor only receives new data after it's created. +# But it can receive offset update at the beginning and periodically +# via the heartbeat message. +system ok +internal_table.mjs --name s --type '' --count +---- +count: 1 + + +statement ok +create table tt1 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt1', +); + +statement ok +create table tt2 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt2', +); + +statement ok +create table tt3 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt3', +); + +statement ok +create table tt4 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt4', +); + +statement ok +create table tt5 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = 'non-shared-cdc', + password = '123456', + database.name = 'testdb1', + table.name = 'tt5', +); + +statement ok +create table tt1_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt1'; + +statement ok +create table tt2_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt2'; + +statement ok +create table tt3_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt3'; + +statement ok +create table tt4_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt4'; + +statement ok +create table tt5_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt5'; + +sleep 5s + +query IT +select * from tt1; +---- +1 2023-10-23 10:00:00+00:00 + +query IT +select * from tt2; +---- +2 2023-10-23 11:00:00+00:00 + +query IT +select * from tt3; +---- +3 2023-10-23 12:00:00+00:00 + +query IT +select * from tt4; +---- +4 2023-10-23 13:00:00+00:00 + +query IT +select * from tt5; +---- +5 2023-10-23 14:00:00+00:00 + + +query IT +select * from tt1_shared; +---- +1 2023-10-23 10:00:00+00:00 + +query IT +select * from tt2_shared; +---- +2 2023-10-23 11:00:00+00:00 + +query IT +select * from tt3_shared; +---- +3 2023-10-23 12:00:00+00:00 + +query IT +select * from tt4_shared; +---- +4 2023-10-23 13:00:00+00:00 + +query IT +select * from tt5_shared; +---- +5 2023-10-23 14:00:00+00:00 + +statement ok +drop table tt1; + +statement ok +drop table tt2; + +statement ok +drop table tt3; + +statement ok +drop table tt4; + +statement ok +drop table tt5; + +statement ok +drop source s cascade; diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 8af865099ac7..f616fb40711c 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -39,8 +39,16 @@ fi category = "RiseDev - Test - Source Test - Kafka" description = "Delete all kafka topics." dependencies = ["kafka-hook"] -command = "rpk" -args = ["topic", "delete", "-r", "*"] +script = ''' +#!/usr/bin/env sh +set -e +if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "Deleting all Kafka topics..." + rpk topic delete -r "*" +else + echo "No Kafka to clean." +fi +''' [tasks.kafka-topics] category = "RiseDev - Test - Source Test - Kafka" diff --git a/risedev.yml b/risedev.yml index 721043ccf7fa..bfef64e6b598 100644 --- a/risedev.yml +++ b/risedev.yml @@ -823,6 +823,30 @@ profile: address: message_queue port: 29092 + ci-inline-source-test: + config-path: src/config/ci.toml + steps: + - use: minio + - use: etcd + unsafe-no-fsync: true + - use: meta-node + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + - use: pubsub + persist-data: true + - use: kafka + user-managed: true + address: message_queue + port: 29092 + - use: mysql + port: 3306 + address: mysql + user: root + password: 123456 + user-managed: true + ci-redis: config-path: src/config/ci.toml steps: @@ -1406,3 +1430,21 @@ template: # address of redis address: "127.0.0.1" + + # MySQL service. Currently user-managed only + mysql: + # Id to be picked-up by services + id: mysql + + # listen port of mysql + port: 8306 + # address of mysql + address: "127.0.0.1" + + # Note: these configs are not validated by risedev. They are passed as-is to risedev-env + # default user for mysql operations. + # This is not used in RISEDEV_MYSQL_WITH_OPTIONS_COMMON. + user: root + password: "" + + user-managed: true diff --git a/src/risedevtool/src/bin/risedev-compose.rs b/src/risedevtool/src/bin/risedev-compose.rs index 0f3cb5b020fc..c3a7d079e4c4 100644 --- a/src/risedevtool/src/bin/risedev-compose.rs +++ b/src/risedevtool/src/bin/risedev-compose.rs @@ -222,7 +222,9 @@ fn main() -> Result<()> { volumes.insert(c.id.clone(), ComposeVolume::default()); (c.address.clone(), c.compose(&compose_config)?) } - ServiceConfig::Redis(_) => return Err(anyhow!("not supported")), + ServiceConfig::Redis(_) | ServiceConfig::MySql(_) => { + return Err(anyhow!("not supported")) + } }; compose.container_name = service.id().to_string(); if opts.deploy { diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index d5a14cbf8c1b..7c741407dd16 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -18,17 +18,16 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use console::style; use fs_err::OpenOptions; use indicatif::ProgressBar; use risedev::util::{complete_spin, fail_spin}; use risedev::{ - generate_risedev_env, preflight_check, AwsS3Config, CompactorService, ComputeNodeService, - ConfigExpander, ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService, - GrafanaService, KafkaService, MetaNodeService, MinioService, OpendalConfig, PrometheusService, - PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, - RISEDEV_NAME, + generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, + ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService, + GrafanaService, KafkaService, MetaNodeService, MinioService, PrometheusService, PubsubService, + RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService, RISEDEV_NAME, }; use tempfile::tempdir; use thiserror_ext::AsReport; @@ -136,7 +135,7 @@ fn task_main( // TODO(chi): etcd will set its health check to success only after all nodes are // connected and there's a leader, therefore we cannot do health check for now. let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; } ServiceConfig::Sqlite(c) => { @@ -173,7 +172,7 @@ fn task_main( let mut service = PrometheusService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api http://{}:{}/", c.address, c.port)); @@ -185,7 +184,7 @@ fn task_main( service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api grpc://{}:{}/", c.address, c.port)); @@ -196,7 +195,7 @@ fn task_main( let mut service = MetaNodeService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb.set_message(format!( "api grpc://{}:{}/, dashboard http://{}:{}/", @@ -209,7 +208,7 @@ fn task_main( let mut service = FrontendService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api postgres://{}:{}/", c.address, c.port)); @@ -231,7 +230,7 @@ fn task_main( let mut service = CompactorService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("compactor {}:{}", c.address, c.port)); @@ -242,7 +241,7 @@ fn task_main( let mut service = GrafanaService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("dashboard http://{}:{}/", c.address, c.port)); @@ -253,7 +252,7 @@ fn task_main( let mut service = TempoService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.listen_address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.listen_address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("api http://{}:{}/", c.listen_address, c.port)); @@ -261,46 +260,14 @@ fn task_main( ServiceConfig::AwsS3(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - - struct AwsService(AwsS3Config); - impl Task for AwsService { - fn execute( - &mut self, - _ctx: &mut ExecuteContext, - ) -> anyhow::Result<()> { - Ok(()) - } - - fn id(&self) -> String { - self.0.id.clone() - } - } - - ctx.service(&AwsService(c.clone())); - ctx.complete_spin(); + DummyService::new(&c.id).execute(&mut ctx)?; ctx.pb .set_message(format!("using AWS s3 bucket {}", c.bucket)); } ServiceConfig::Opendal(c) => { let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - - struct OpendalService(OpendalConfig); - impl Task for OpendalService { - fn execute( - &mut self, - _ctx: &mut ExecuteContext, - ) -> anyhow::Result<()> { - Ok(()) - } - - fn id(&self) -> String { - self.0.id.clone() - } - } - - ctx.service(&OpendalService(c.clone())); - ctx.complete_spin(); + DummyService::new(&c.id).execute(&mut ctx)?; ctx.pb .set_message(format!("using Opendal, namenode = {}", c.namenode)); } @@ -310,7 +277,7 @@ fn task_main( let mut service = ZooKeeperService::new(c.clone())?; service.execute(&mut ctx)?; let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; ctx.pb .set_message(format!("zookeeper {}:{}", c.address, c.port)); @@ -348,6 +315,20 @@ fn task_main( ctx.pb .set_message(format!("redis {}:{}", c.address, c.port)); } + ServiceConfig::MySql(c) => { + let mut ctx = + ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); + // TODO: support starting mysql in RiseDev. Currently it's user-managed only + if !c.user_managed { + bail!("Non user-managed MySQL is not supported yet. Please use user-managed MySQL."); + } + DummyService::new(&c.id).execute(&mut ctx)?; + let mut task = + risedev::ConfigureTcpNodeTask::new(c.address.clone(), c.port, c.user_managed)?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("mysql {}:{}/", c.address, c.port)); + } } let service_id = service.id().to_string(); diff --git a/src/risedevtool/src/config.rs b/src/risedevtool/src/config.rs index d5fe2d8c220a..45d90daa0c87 100644 --- a/src/risedevtool/src/config.rs +++ b/src/risedevtool/src/config.rs @@ -174,6 +174,7 @@ impl ConfigExpander { "redis" => ServiceConfig::Redis(serde_yaml::from_str(&out_str)?), "zookeeper" => ServiceConfig::ZooKeeper(serde_yaml::from_str(&out_str)?), "redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?), + "mysql" => ServiceConfig::MySql(serde_yaml::from_str(&out_str)?), other => return Err(anyhow!("unsupported use type: {}", other)), }; Ok(result) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 1efdf1470998..24f18895434e 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -77,6 +77,20 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } + ServiceConfig::MySql(c) => { + let host = &c.address; + let port = &c.port; + let user = &c.user; + let password = &c.password; + // These envs are used by `mysql` cli. + writeln!(env, r#"MYSQL_HOST="{host}""#,).unwrap(); + writeln!(env, r#"MYSQL_TCP_PORT="{port}""#,).unwrap(); + writeln!(env, r#"MYSQL_USER="{user}""#,).unwrap(); + writeln!(env, r#"MYSQL_PWD="{password}""#,).unwrap(); + // Note: user and password are not included in the common WITH options. + // It's expected to create another dedicated user for the source. + writeln!(env, r#"RISEDEV_MYSQL_WITH_OPTIONS_COMMON="connector='mysql-cdc',hostname='{host}',port='{port}'""#,).unwrap(); + } _ => {} } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index e5f149b8d10c..19dffa388cdf 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -336,6 +336,23 @@ pub struct RedisConfig { pub address: String, } +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +#[serde(deny_unknown_fields)] +pub struct MySqlConfig { + #[serde(rename = "use")] + phantom_use: Option, + pub id: String, + + pub port: u16, + pub address: String, + + pub user: String, + pub password: String, + + pub user_managed: bool, +} + /// All service configuration #[derive(Clone, Debug, PartialEq)] pub enum ServiceConfig { @@ -356,6 +373,7 @@ pub enum ServiceConfig { Redis(RedisConfig), ZooKeeper(ZooKeeperConfig), RedPanda(RedPandaConfig), + MySql(MySqlConfig), } impl ServiceConfig { @@ -378,6 +396,7 @@ impl ServiceConfig { Self::Redis(c) => &c.id, Self::RedPanda(c) => &c.id, Self::Opendal(c) => &c.id, + Self::MySql(c) => &c.id, } } @@ -400,6 +419,7 @@ impl ServiceConfig { Self::Redis(c) => Some(c.port), Self::RedPanda(_c) => None, Self::Opendal(_) => None, + Self::MySql(c) => Some(c.port), } } @@ -422,6 +442,7 @@ impl ServiceConfig { Self::Redis(_c) => false, Self::RedPanda(_c) => false, Self::Opendal(_c) => false, + Self::MySql(c) => c.user_managed, } } } diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index bec0df5ae812..84b2a1b9428e 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -15,6 +15,7 @@ mod compactor_service; mod compute_node_service; mod configure_tmux_service; +mod dummy_service; mod ensure_stop_service; mod etcd_service; mod frontend_service; @@ -36,7 +37,7 @@ mod utils; mod zookeeper_service; use std::env; -use std::net::TcpStream; +use std::net::{TcpStream, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::process::{Command, Output}; use std::sync::Arc; @@ -51,6 +52,7 @@ pub use utils::*; pub use self::compactor_service::*; pub use self::compute_node_service::*; pub use self::configure_tmux_service::*; +pub use self::dummy_service::DummyService; pub use self::ensure_stop_service::*; pub use self::etcd_service::*; pub use self::frontend_service::*; @@ -255,7 +257,11 @@ where /// Wait for a user-managed service to be available pub fn wait_tcp_user(&mut self, server: impl AsRef) -> anyhow::Result<()> { - let addr = server.as_ref().parse()?; + let addr = server + .as_ref() + .to_socket_addrs()? + .next() + .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref())); wait( || { TcpStream::connect_timeout(&addr, Duration::from_secs(1))?; diff --git a/src/risedevtool/src/task/dummy_service.rs b/src/risedevtool/src/task/dummy_service.rs new file mode 100644 index 000000000000..f3d1054b1ca2 --- /dev/null +++ b/src/risedevtool/src/task/dummy_service.rs @@ -0,0 +1,42 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ExecuteContext, Task}; + +pub struct DummyService { + id: String, +} + +impl DummyService { + pub fn new(id: &str) -> Self { + Self { id: id.to_string() } + } +} + +impl Task for DummyService { + fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + ctx.service(self); + writeln!( + &mut ctx.log, + "{} is a dummy service. Please ensure it's correctly configured on your own! 🙏", + self.id + )?; + ctx.complete_spin(); + Ok(()) + } + + fn id(&self) -> String { + self.id.clone() + } +} diff --git a/src/risedevtool/src/task/task_configure_grpc_node.rs b/src/risedevtool/src/task/task_configure_grpc_node.rs index 75a69a7cdb29..5a556a5fca42 100644 --- a/src/risedevtool/src/task/task_configure_grpc_node.rs +++ b/src/risedevtool/src/task/task_configure_grpc_node.rs @@ -16,13 +16,13 @@ use anyhow::Result; use super::{ExecuteContext, Task}; -pub struct ConfigureGrpcNodeTask { +pub struct ConfigureTcpNodeTask { advertise_address: String, port: u16, user_managed: bool, } -impl ConfigureGrpcNodeTask { +impl ConfigureTcpNodeTask { pub fn new(advertise_address: String, port: u16, user_managed: bool) -> Result { Ok(Self { advertise_address, @@ -32,8 +32,12 @@ impl ConfigureGrpcNodeTask { } } -impl Task for ConfigureGrpcNodeTask { +impl Task for ConfigureTcpNodeTask { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + assert!( + ctx.id.is_some(), + "Service should be set before executing ConfigureTcpNodeTask" + ); let address = format!("{}:{}", self.advertise_address, self.port); if self.user_managed {